This is an automated email from the ASF dual-hosted git repository. nitiraj pushed a commit to branch branch-2.6 in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/branch-2.6 by this push: new 0d7be53 AMBARI-24808 : Logs are not showing in Hiveview2 for hive query. (nitirajrathore) (#2495) 0d7be53 is described below commit 0d7be53e215270d19a0eb480214a43ee3a33e2dc Author: nitirajrathore <nitiraj.rath...@gmail.com> AuthorDate: Wed Oct 24 12:01:41 2018 +0530 AMBARI-24808 : Logs are not showing in Hiveview2 for hive query. (nitirajrathore) (#2495) --- .../ambari/view/hive2/actor/JdbcConnector.java | 8 ++- .../ambari/view/hive2/actor/LogAggregator.java | 65 ++++++++++---------- .../ambari/view/hive2/actor/StatementExecutor.java | 10 +-- .../src/main/resources/ui/hive-web/package.json | 2 +- .../src/main/resources/ui/hive-web/yarn.lock | 6 +- .../ambari/view/hive20/actor/JdbcConnector.java | 20 +++--- .../ambari/view/hive20/actor/LogAggregator.java | 71 +++++++++++----------- .../view/hive20/actor/StatementExecutor.java | 11 +--- 8 files changed, 97 insertions(+), 96 deletions(-) diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/JdbcConnector.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/JdbcConnector.java index f7746d9..df76798 100644 --- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/JdbcConnector.java +++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/JdbcConnector.java @@ -323,7 +323,7 @@ public class JdbcConnector extends HiveActor { int index = statementsCount - statementQueue.size(); String statement = statementQueue.poll(); if (statementExecutor == null) { - statementExecutor = getStatementExecutor(); + statementExecutor = createStatementExecutor(); } if (isAsync()) { @@ -378,11 +378,13 @@ public class JdbcConnector extends HiveActor { executing = true; executionType = message.getType(); commandSender = getSender(); - statementExecutor = getStatementExecutor(); + if(null == statementExecutor) { + statementExecutor = createStatementExecutor(); + } statementExecutor.tell(message, self()); } - private ActorRef getStatementExecutor() { + private ActorRef createStatementExecutor() { return getContext().actorOf(Props.create(StatementExecutor.class, hdfsApi, storage, connectable.getConnection().get(), connectionDelegate) .withDispatcher("akka.actor.result-dispatcher"), "StatementExecutor:" + UUID.randomUUID().toString()); diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/LogAggregator.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/LogAggregator.java index 69b4a56..aaba20e 100644 --- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/LogAggregator.java +++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/LogAggregator.java @@ -21,6 +21,9 @@ package org.apache.ambari.view.hive2.actor; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Cancellable; +import akka.event.Logging; +import akka.event.LoggingAdapter; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import org.apache.ambari.view.hive2.actor.message.GetMoreLogs; import org.apache.ambari.view.hive2.actor.message.HiveMessage; @@ -30,6 +33,7 @@ import org.apache.ambari.view.utils.hdfs.HdfsApi; import org.apache.ambari.view.utils.hdfs.HdfsApiException; import org.apache.ambari.view.utils.hdfs.HdfsUtil; import org.apache.commons.lang.StringUtils; +import org.apache.hive.jdbc.ClosedOrCancelledStatementException; import org.apache.hive.jdbc.HiveStatement; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,7 +48,7 @@ import java.util.concurrent.TimeUnit; */ public class LogAggregator extends HiveActor { - private final Logger LOG = LoggerFactory.getLogger(getClass()); + private final LoggingAdapter LOG = Logging.getLogger(getContext().system(), this); public static final int AGGREGATION_INTERVAL = 5 * 1000; private final HdfsApi hdfsApi; @@ -53,8 +57,6 @@ public class LogAggregator extends HiveActor { private Cancellable moreLogsScheduler; private ActorRef parent; - private boolean hasStartedFetching = false; - private boolean shouldFetchMore = true; private String allLogs = ""; public LogAggregator(HdfsApi hdfsApi, String logFile) { @@ -70,56 +72,57 @@ public class LogAggregator extends HiveActor { } if (message instanceof GetMoreLogs) { - try { - getMoreLogs(); - } catch (SQLException e) { - LOG.error("SQL Error while getting logs. Tried writing to: {}. Exception: {}", logFile, e); - } catch (HdfsApiException e) { - LOG.warn("HDFS Error while getting writing logs to {}. Exception: {}", logFile, e); - } + getMoreLogs(); } } private void start(StartLogAggregation message) { + if (null != this.statement) { + LOG.debug("fetching logs for previous statement before switching to the new one. for logFile: {} by actor: {}", logFile, getSelf()); + getMoreLogs(); + } this.statement = message.getHiveStatement(); + LOG.debug("Starting to fetch logs for logFile: {} by actor: {}", logFile, getSelf()); parent = this.getSender(); - hasStartedFetching = false; - shouldFetchMore = true; String logTitle = "Logs for Query '" + message.getStatement() + "'"; String repeatSeperator = StringUtils.repeat("=", logTitle.length()); allLogs += String.format("\n\n%s\n%s\n%s\n", repeatSeperator, logTitle, repeatSeperator); - if (!(moreLogsScheduler == null || moreLogsScheduler.isCancelled())) { - moreLogsScheduler.cancel(); + if (moreLogsScheduler == null) { + setupScheduler(); } + } + + @VisibleForTesting + private void setupScheduler() { this.moreLogsScheduler = getContext().system().scheduler().schedule( - Duration.Zero(), Duration.create(AGGREGATION_INTERVAL, TimeUnit.MILLISECONDS), - this.getSelf(), new GetMoreLogs(), getContext().dispatcher(), null); + Duration.Zero(), Duration.create(AGGREGATION_INTERVAL, TimeUnit.MILLISECONDS), + this.getSelf(), new GetMoreLogs(), getContext().dispatcher(), null); } - private void getMoreLogs() throws SQLException, HdfsApiException { - List<String> logs = statement.getQueryLog(); - if (logs.size() > 0 && shouldFetchMore) { - allLogs = allLogs + "\n" + Joiner.on("\n").skipNulls().join(logs); - HdfsUtil.putStringToFile(hdfsApi, logFile, allLogs); - if(!statement.hasMoreLogs()) { - shouldFetchMore = false; - } - } else { - // Cancel the timer only when log fetching has been started - if(!shouldFetchMore) { - moreLogsScheduler.cancel(); - parent.tell(new LogAggregationFinished(), ActorRef.noSender()); + private void getMoreLogs() { + LOG.debug("fetching more logs for : {}", logFile); + if (null != this.statement) { + List<String> logs; + try { + logs = this.statement.getQueryLog(); + if (!statement.hasMoreLogs()) { + allLogs = allLogs + "\n" + Joiner.on("\n").skipNulls().join(logs); + HdfsUtil.putStringToFile(hdfsApi, logFile, allLogs); + // Cancel the timer only when log fetching has been started + } + } catch (Exception e) { + LOG.error("Error occurred while fetching logs for logFile : {}", logFile, e); } } } @Override public void postStop() throws Exception { + LOG.info("Stopping logaggregator after fetching the logs one last time : for logFile {}", logFile); + getMoreLogs(); if (moreLogsScheduler != null && !moreLogsScheduler.isCancelled()) { moreLogsScheduler.cancel(); } - } - } diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/StatementExecutor.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/StatementExecutor.java index 6cdee81..b067704 100644 --- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/StatementExecutor.java +++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/StatementExecutor.java @@ -126,16 +126,10 @@ public class StatementExecutor extends HiveActor { logAggregator.tell(new StartLogAggregation(sqlStatement, statement), getSelf()); } - private void stopLogAggregation() { - if (logAggregator != null) { - getContext().stop(logAggregator); - } - logAggregator = null; - } - @Override public void postStop() throws Exception { - stopLogAggregation(); + LOG.info("stopping StatementExecutor : {}", getSelf()); + this.logAggregator = null; } private void getColumnMetaData(GetColumnMetadataJob message) { diff --git a/contrib/views/hive-next/src/main/resources/ui/hive-web/package.json b/contrib/views/hive-next/src/main/resources/ui/hive-web/package.json index 545cefd..aa23579 100644 --- a/contrib/views/hive-next/src/main/resources/ui/hive-web/package.json +++ b/contrib/views/hive-next/src/main/resources/ui/hive-web/package.json @@ -21,7 +21,7 @@ "license": "MIT", "devDependencies": { "body-parser": "^1.2.0", - "bower": ">= 1.3.12", + "bower": "1.8.4", "broccoli-asset-rev": "^2.0.0", "ember-ajax": "^2.0.1", "broccoli-sass": "^0.6.3", diff --git a/contrib/views/hive-next/src/main/resources/ui/hive-web/yarn.lock b/contrib/views/hive-next/src/main/resources/ui/hive-web/yarn.lock index 372fe54..5e0a58b 100644 --- a/contrib/views/hive-next/src/main/resources/ui/hive-web/yarn.lock +++ b/contrib/views/hive-next/src/main/resources/ui/hive-web/yarn.lock @@ -483,9 +483,9 @@ bower-config@0.5.2: optimist "~0.6.0" osenv "0.0.3" -"bower@>= 1.3.12", bower@^1.3.12: - version "1.8.0" - resolved "https://registry.yarnpkg.com/bower/-/bower-1.8.0.tgz#55dbebef0ad9155382d9e9d3e497c1372345b44a" +bower@1.8.4, bower@^1.3.12: + version "1.8.4" + resolved "https://registry.yarnpkg.com/bower/-/bower-1.8.4.tgz#e7876a076deb8137f7d06525dc5e8c66db82f28a" brace-expansion@^1.0.0: version "1.1.7" diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/JdbcConnector.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/JdbcConnector.java index 5a744e2..9117ef5 100644 --- a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/JdbcConnector.java +++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/JdbcConnector.java @@ -354,7 +354,7 @@ public class JdbcConnector extends HiveActor { int index = statementsCount - statementQueue.size(); String statement = statementQueue.poll(); if (statementExecutor == null) { - statementExecutor = getStatementExecutor(); + statementExecutor = createStatementExecutor(); } if (isAsync()) { @@ -409,7 +409,9 @@ public class JdbcConnector extends HiveActor { executing = true; executionType = message.getType(); commandSender = getSender(); - statementExecutor = getStatementExecutor(); + if (statementExecutor == null) { + statementExecutor = createStatementExecutor(); + } statementExecutor.tell(message, self()); } @@ -419,14 +421,18 @@ public class JdbcConnector extends HiveActor { executing = true; executionType = message.getType(); commandSender = getSender(); - statementExecutor = getStatementExecutor(); + if (statementExecutor == null) { + statementExecutor = createStatementExecutor(); + } statementExecutor.tell(message, self()); } - private ActorRef getStatementExecutor() { - return getContext().actorOf(Props.create(StatementExecutor.class, hdfsApi, storage, connectable.getConnection().get(), connectionDelegate) - .withDispatcher("akka.actor.result-dispatcher"), - "StatementExecutor:" + UUID.randomUUID().toString()); + private ActorRef createStatementExecutor() { + ActorRef actorRef = getContext().actorOf(Props.create(StatementExecutor.class, hdfsApi, storage, connectable.getConnection().get(), connectionDelegate) + .withDispatcher("akka.actor.result-dispatcher"), + "StatementExecutor:" + UUID.randomUUID().toString()); + LOG.debug("Created new statement executor: {}", actorRef); + return actorRef; } private boolean isAsync() { diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/LogAggregator.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/LogAggregator.java index 2c8a65d..d198965 100644 --- a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/LogAggregator.java +++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/LogAggregator.java @@ -20,21 +20,19 @@ package org.apache.ambari.view.hive20.actor; import akka.actor.ActorRef; import akka.actor.Cancellable; +import akka.event.Logging; +import akka.event.LoggingAdapter; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import org.apache.ambari.view.hive20.actor.message.GetMoreLogs; import org.apache.ambari.view.hive20.actor.message.HiveMessage; -import org.apache.ambari.view.hive20.actor.message.LogAggregationFinished; import org.apache.ambari.view.hive20.actor.message.StartLogAggregation; import org.apache.ambari.view.utils.hdfs.HdfsApi; -import org.apache.ambari.view.utils.hdfs.HdfsApiException; import org.apache.ambari.view.utils.hdfs.HdfsUtil; import org.apache.commons.lang3.StringUtils; import org.apache.hive.jdbc.HiveStatement; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import scala.concurrent.duration.Duration; -import java.sql.SQLException; import java.util.List; import java.util.concurrent.TimeUnit; @@ -43,7 +41,7 @@ import java.util.concurrent.TimeUnit; */ public class LogAggregator extends HiveActor { - private final Logger LOG = LoggerFactory.getLogger(getClass()); + private final LoggingAdapter LOG = Logging.getLogger(getContext().system(), this); public static final int AGGREGATION_INTERVAL = 5 * 1000; private final HdfsApi hdfsApi; @@ -52,8 +50,6 @@ public class LogAggregator extends HiveActor { private Cancellable moreLogsScheduler; private ActorRef parent; - private boolean hasStartedFetching = false; - private boolean shouldFetchMore = true; private String allLogs = ""; public LogAggregator(HdfsApi hdfsApi, String logFile) { @@ -69,57 +65,62 @@ public class LogAggregator extends HiveActor { } if (message instanceof GetMoreLogs) { - try { - getMoreLogs(); - } catch (SQLException e) { - LOG.warn("SQL Error while getting logs. Tried writing to: {}. Exception: {}", logFile, e.getMessage()); - } catch (HdfsApiException e) { - LOG.warn("HDFS Error while writing logs to {}. Exception: {}", logFile, e.getMessage()); - - } + getMoreLogs(); } } private void start(StartLogAggregation message) { + + if (null != this.statement) { + LOG.debug("fetching logs for previous statement before switching to the new one. for {}", getSelf()); + getMoreLogs(); + } this.statement = message.getHiveStatement(); parent = this.getSender(); - hasStartedFetching = false; - shouldFetchMore = true; String logTitle = "Logs for Query '" + message.getStatement() + "'"; String repeatSeperator = StringUtils.repeat("=", logTitle.length()); allLogs += String.format("\n\n%s\n%s\n%s\n", repeatSeperator, logTitle, repeatSeperator); - if (!(moreLogsScheduler == null || moreLogsScheduler.isCancelled())) { - moreLogsScheduler.cancel(); + if( null == moreLogsScheduler) { + setupScheduler(); } + } + + @VisibleForTesting + protected void setupScheduler() { this.moreLogsScheduler = getContext().system().scheduler().schedule( - Duration.Zero(), Duration.create(AGGREGATION_INTERVAL, TimeUnit.MILLISECONDS), - this.getSelf(), new GetMoreLogs(), getContext().dispatcher(), null); + Duration.Zero(), Duration.create(AGGREGATION_INTERVAL, TimeUnit.MILLISECONDS), + getSelf(), new GetMoreLogs(), getContext().dispatcher(), null); } - private void getMoreLogs() throws SQLException, HdfsApiException { - List<String> logs = statement.getQueryLog(); - if (logs.size() > 0 && shouldFetchMore) { - allLogs = allLogs + "\n" + Joiner.on("\n").skipNulls().join(logs); - HdfsUtil.putStringToFile(hdfsApi, logFile, allLogs); - if(!statement.hasMoreLogs()) { - shouldFetchMore = false; + + private void getMoreLogs() { + LOG.debug("fetching more logs for : {}", getSelf()); + if ((null != this.statement)){ + List<String> logs; + try{ + logs = this.statement.getQueryLog(); + LOG.debug("got more logs : {} for : {}", logs, getSelf()); + if (logs.size() > 0){ + this.allLogs = (this.allLogs + "\n" + Joiner.on("\n").skipNulls().join(logs)); + HdfsUtil.putStringToFile(this.hdfsApi, this.logFile, this.allLogs); + } } - } else { - // Cancel the timer only when log fetching has been started - if(!shouldFetchMore) { - moreLogsScheduler.cancel(); - parent.tell(new LogAggregationFinished(), ActorRef.noSender()); + catch (Exception e){ + LOG.error("Error occurred while fetching logs for : {}", getSelf(), e); } } } @Override public void postStop() throws Exception { + LOG.debug("Stopping logaggregator after fetching the logs one last time : {}", getSelf()); + + getMoreLogs(); + if (moreLogsScheduler != null && !moreLogsScheduler.isCancelled()) { moreLogsScheduler.cancel(); } - } } diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/StatementExecutor.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/StatementExecutor.java index d73a284..333991c 100644 --- a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/StatementExecutor.java +++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/StatementExecutor.java @@ -130,16 +130,11 @@ public class StatementExecutor extends HiveActor { logAggregator.tell(new StartLogAggregation(sqlStatement, statement), getSelf()); } - private void stopLogAggregation() { - if (logAggregator != null) { - getContext().stop(logAggregator); - } - logAggregator = null; - } - @Override public void postStop() throws Exception { - stopLogAggregation(); + LOG.info("stopping StatementExecutor : {}", getSelf()); + this.logAggregator = null; + }