This is an automated email from the ASF dual-hosted git repository.

nitiraj pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/ambari.git


The following commit(s) were added to refs/heads/branch-2.6 by this push:
     new 0d7be53  AMBARI-24808 : Logs are not showing in Hiveview2 for hive 
query. (nitirajrathore) (#2495)
0d7be53 is described below

commit 0d7be53e215270d19a0eb480214a43ee3a33e2dc
Author: nitirajrathore <nitiraj.rath...@gmail.com>
AuthorDate: Wed Oct 24 12:01:41 2018 +0530

    AMBARI-24808 : Logs are not showing in Hiveview2 for hive query. 
(nitirajrathore) (#2495)
---
 .../ambari/view/hive2/actor/JdbcConnector.java     |  8 ++-
 .../ambari/view/hive2/actor/LogAggregator.java     | 65 ++++++++++----------
 .../ambari/view/hive2/actor/StatementExecutor.java | 10 +--
 .../src/main/resources/ui/hive-web/package.json    |  2 +-
 .../src/main/resources/ui/hive-web/yarn.lock       |  6 +-
 .../ambari/view/hive20/actor/JdbcConnector.java    | 20 +++---
 .../ambari/view/hive20/actor/LogAggregator.java    | 71 +++++++++++-----------
 .../view/hive20/actor/StatementExecutor.java       | 11 +---
 8 files changed, 97 insertions(+), 96 deletions(-)

diff --git 
a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/JdbcConnector.java
 
b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/JdbcConnector.java
index f7746d9..df76798 100644
--- 
a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/JdbcConnector.java
+++ 
b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/JdbcConnector.java
@@ -323,7 +323,7 @@ public class JdbcConnector extends HiveActor {
     int index = statementsCount - statementQueue.size();
     String statement = statementQueue.poll();
     if (statementExecutor == null) {
-      statementExecutor = getStatementExecutor();
+      statementExecutor = createStatementExecutor();
     }
 
     if (isAsync()) {
@@ -378,11 +378,13 @@ public class JdbcConnector extends HiveActor {
     executing = true;
     executionType = message.getType();
     commandSender = getSender();
-    statementExecutor = getStatementExecutor();
+    if(null == statementExecutor) {
+      statementExecutor = createStatementExecutor();
+    }
     statementExecutor.tell(message, self());
   }
 
-  private ActorRef getStatementExecutor() {
+  private ActorRef createStatementExecutor() {
     return getContext().actorOf(Props.create(StatementExecutor.class, hdfsApi, 
storage, connectable.getConnection().get(), connectionDelegate)
       .withDispatcher("akka.actor.result-dispatcher"),
       "StatementExecutor:" + UUID.randomUUID().toString());
diff --git 
a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/LogAggregator.java
 
b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/LogAggregator.java
index 69b4a56..aaba20e 100644
--- 
a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/LogAggregator.java
+++ 
b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/LogAggregator.java
@@ -21,6 +21,9 @@ package org.apache.ambari.view.hive2.actor;
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.Cancellable;
+import akka.event.Logging;
+import akka.event.LoggingAdapter;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import org.apache.ambari.view.hive2.actor.message.GetMoreLogs;
 import org.apache.ambari.view.hive2.actor.message.HiveMessage;
@@ -30,6 +33,7 @@ import org.apache.ambari.view.utils.hdfs.HdfsApi;
 import org.apache.ambari.view.utils.hdfs.HdfsApiException;
 import org.apache.ambari.view.utils.hdfs.HdfsUtil;
 import org.apache.commons.lang.StringUtils;
+import org.apache.hive.jdbc.ClosedOrCancelledStatementException;
 import org.apache.hive.jdbc.HiveStatement;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,7 +48,7 @@ import java.util.concurrent.TimeUnit;
  */
 public class LogAggregator extends HiveActor {
 
-  private final Logger LOG = LoggerFactory.getLogger(getClass());
+  private final LoggingAdapter LOG = Logging.getLogger(getContext().system(), 
this);
 
   public static final int AGGREGATION_INTERVAL = 5 * 1000;
   private final HdfsApi hdfsApi;
@@ -53,8 +57,6 @@ public class LogAggregator extends HiveActor {
 
   private Cancellable moreLogsScheduler;
   private ActorRef parent;
-  private boolean hasStartedFetching = false;
-  private boolean shouldFetchMore = true;
   private String allLogs = "";
 
   public LogAggregator(HdfsApi hdfsApi, String logFile) {
@@ -70,56 +72,57 @@ public class LogAggregator extends HiveActor {
     }
 
     if (message instanceof GetMoreLogs) {
-      try {
-        getMoreLogs();
-      } catch (SQLException e) {
-        LOG.error("SQL Error while getting logs. Tried writing to: {}. 
Exception: {}", logFile, e);
-      } catch (HdfsApiException e) {
-        LOG.warn("HDFS Error while getting writing logs to {}. Exception: {}", 
logFile, e);
-      }
+      getMoreLogs();
     }
   }
 
   private void start(StartLogAggregation message) {
+    if (null != this.statement) {
+      LOG.debug("fetching logs for previous statement before switching to the 
new one. for logFile: {} by actor: {}", logFile, getSelf());
+      getMoreLogs();
+    }
     this.statement = message.getHiveStatement();
+    LOG.debug("Starting to fetch logs for logFile: {} by actor: {}", logFile, 
getSelf());
     parent = this.getSender();
-    hasStartedFetching = false;
-    shouldFetchMore = true;
     String logTitle = "Logs for Query '" + message.getStatement() + "'";
     String repeatSeperator = StringUtils.repeat("=", logTitle.length());
     allLogs += String.format("\n\n%s\n%s\n%s\n", repeatSeperator, logTitle, 
repeatSeperator);
 
-    if (!(moreLogsScheduler == null || moreLogsScheduler.isCancelled())) {
-      moreLogsScheduler.cancel();
+    if (moreLogsScheduler == null) {
+      setupScheduler();
     }
+  }
+
+  @VisibleForTesting
+  private void setupScheduler() {
     this.moreLogsScheduler = getContext().system().scheduler().schedule(
-      Duration.Zero(), Duration.create(AGGREGATION_INTERVAL, 
TimeUnit.MILLISECONDS),
-      this.getSelf(), new GetMoreLogs(), getContext().dispatcher(), null);
+        Duration.Zero(), Duration.create(AGGREGATION_INTERVAL, 
TimeUnit.MILLISECONDS),
+        this.getSelf(), new GetMoreLogs(), getContext().dispatcher(), null);
   }
 
-  private void getMoreLogs() throws SQLException, HdfsApiException {
-    List<String> logs = statement.getQueryLog();
-    if (logs.size() > 0 && shouldFetchMore) {
-      allLogs = allLogs + "\n" + Joiner.on("\n").skipNulls().join(logs);
-      HdfsUtil.putStringToFile(hdfsApi, logFile, allLogs);
-      if(!statement.hasMoreLogs()) {
-        shouldFetchMore = false;
-      }
-    } else {
-      // Cancel the timer only when log fetching has been started
-      if(!shouldFetchMore) {
-        moreLogsScheduler.cancel();
-        parent.tell(new LogAggregationFinished(), ActorRef.noSender());
+  private void getMoreLogs() {
+    LOG.debug("fetching more logs for : {}", logFile);
+    if (null != this.statement) {
+      List<String> logs;
+      try {
+        logs = this.statement.getQueryLog();
+        if (!statement.hasMoreLogs()) {
+          allLogs = allLogs + "\n" + Joiner.on("\n").skipNulls().join(logs);
+          HdfsUtil.putStringToFile(hdfsApi, logFile, allLogs);
+          // Cancel the timer only when log fetching has been started
+        }
+      } catch (Exception e) {
+        LOG.error("Error occurred while fetching logs for logFile : {}", 
logFile, e);
       }
     }
   }
 
   @Override
   public void postStop() throws Exception {
+    LOG.info("Stopping logaggregator after fetching the logs one last time : 
for logFile {}", logFile);
+    getMoreLogs();
     if (moreLogsScheduler != null && !moreLogsScheduler.isCancelled()) {
       moreLogsScheduler.cancel();
     }
-
   }
-
 }
diff --git 
a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/StatementExecutor.java
 
b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/StatementExecutor.java
index 6cdee81..b067704 100644
--- 
a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/StatementExecutor.java
+++ 
b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/StatementExecutor.java
@@ -126,16 +126,10 @@ public class StatementExecutor extends HiveActor {
     logAggregator.tell(new StartLogAggregation(sqlStatement, statement), 
getSelf());
   }
 
-  private void stopLogAggregation() {
-    if (logAggregator != null) {
-      getContext().stop(logAggregator);
-    }
-    logAggregator = null;
-  }
-
   @Override
   public void postStop() throws Exception {
-    stopLogAggregation();
+    LOG.info("stopping StatementExecutor : {}", getSelf());
+    this.logAggregator = null;
   }
 
   private void getColumnMetaData(GetColumnMetadataJob message) {
diff --git 
a/contrib/views/hive-next/src/main/resources/ui/hive-web/package.json 
b/contrib/views/hive-next/src/main/resources/ui/hive-web/package.json
index 545cefd..aa23579 100644
--- a/contrib/views/hive-next/src/main/resources/ui/hive-web/package.json
+++ b/contrib/views/hive-next/src/main/resources/ui/hive-web/package.json
@@ -21,7 +21,7 @@
   "license": "MIT",
   "devDependencies": {
     "body-parser": "^1.2.0",
-    "bower": ">= 1.3.12",
+    "bower": "1.8.4",
     "broccoli-asset-rev": "^2.0.0",
     "ember-ajax": "^2.0.1",
     "broccoli-sass": "^0.6.3",
diff --git a/contrib/views/hive-next/src/main/resources/ui/hive-web/yarn.lock 
b/contrib/views/hive-next/src/main/resources/ui/hive-web/yarn.lock
index 372fe54..5e0a58b 100644
--- a/contrib/views/hive-next/src/main/resources/ui/hive-web/yarn.lock
+++ b/contrib/views/hive-next/src/main/resources/ui/hive-web/yarn.lock
@@ -483,9 +483,9 @@ bower-config@0.5.2:
     optimist "~0.6.0"
     osenv "0.0.3"
 
-"bower@>= 1.3.12", bower@^1.3.12:
-  version "1.8.0"
-  resolved 
"https://registry.yarnpkg.com/bower/-/bower-1.8.0.tgz#55dbebef0ad9155382d9e9d3e497c1372345b44a";
+bower@1.8.4, bower@^1.3.12:
+  version "1.8.4"
+  resolved 
"https://registry.yarnpkg.com/bower/-/bower-1.8.4.tgz#e7876a076deb8137f7d06525dc5e8c66db82f28a";
 
 brace-expansion@^1.0.0:
   version "1.1.7"
diff --git 
a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/JdbcConnector.java
 
b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/JdbcConnector.java
index 5a744e2..9117ef5 100644
--- 
a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/JdbcConnector.java
+++ 
b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/JdbcConnector.java
@@ -354,7 +354,7 @@ public class JdbcConnector extends HiveActor {
     int index = statementsCount - statementQueue.size();
     String statement = statementQueue.poll();
     if (statementExecutor == null) {
-      statementExecutor = getStatementExecutor();
+      statementExecutor = createStatementExecutor();
     }
 
     if (isAsync()) {
@@ -409,7 +409,9 @@ public class JdbcConnector extends HiveActor {
     executing = true;
     executionType = message.getType();
     commandSender = getSender();
-    statementExecutor = getStatementExecutor();
+    if (statementExecutor == null) {
+      statementExecutor = createStatementExecutor();
+    }
     statementExecutor.tell(message, self());
   }
 
@@ -419,14 +421,18 @@ public class JdbcConnector extends HiveActor {
     executing = true;
     executionType = message.getType();
     commandSender = getSender();
-    statementExecutor = getStatementExecutor();
+    if (statementExecutor == null) {
+      statementExecutor = createStatementExecutor();
+    }
     statementExecutor.tell(message, self());
   }
 
-  private ActorRef getStatementExecutor() {
-    return getContext().actorOf(Props.create(StatementExecutor.class, hdfsApi, 
storage, connectable.getConnection().get(), connectionDelegate)
-      .withDispatcher("akka.actor.result-dispatcher"),
-      "StatementExecutor:" + UUID.randomUUID().toString());
+  private ActorRef createStatementExecutor() {
+    ActorRef actorRef = 
getContext().actorOf(Props.create(StatementExecutor.class, hdfsApi, storage, 
connectable.getConnection().get(), connectionDelegate)
+            .withDispatcher("akka.actor.result-dispatcher"),
+        "StatementExecutor:" + UUID.randomUUID().toString());
+    LOG.debug("Created new statement executor: {}", actorRef);
+    return actorRef;
   }
 
   private boolean isAsync() {
diff --git 
a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/LogAggregator.java
 
b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/LogAggregator.java
index 2c8a65d..d198965 100644
--- 
a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/LogAggregator.java
+++ 
b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/LogAggregator.java
@@ -20,21 +20,19 @@ package org.apache.ambari.view.hive20.actor;
 
 import akka.actor.ActorRef;
 import akka.actor.Cancellable;
+import akka.event.Logging;
+import akka.event.LoggingAdapter;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import org.apache.ambari.view.hive20.actor.message.GetMoreLogs;
 import org.apache.ambari.view.hive20.actor.message.HiveMessage;
-import org.apache.ambari.view.hive20.actor.message.LogAggregationFinished;
 import org.apache.ambari.view.hive20.actor.message.StartLogAggregation;
 import org.apache.ambari.view.utils.hdfs.HdfsApi;
-import org.apache.ambari.view.utils.hdfs.HdfsApiException;
 import org.apache.ambari.view.utils.hdfs.HdfsUtil;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hive.jdbc.HiveStatement;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import scala.concurrent.duration.Duration;
 
-import java.sql.SQLException;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
@@ -43,7 +41,7 @@ import java.util.concurrent.TimeUnit;
  */
 public class LogAggregator extends HiveActor {
 
-  private final Logger LOG = LoggerFactory.getLogger(getClass());
+  private final LoggingAdapter LOG = Logging.getLogger(getContext().system(), 
this);
 
   public static final int AGGREGATION_INTERVAL = 5 * 1000;
   private final HdfsApi hdfsApi;
@@ -52,8 +50,6 @@ public class LogAggregator extends HiveActor {
 
   private Cancellable moreLogsScheduler;
   private ActorRef parent;
-  private boolean hasStartedFetching = false;
-  private boolean shouldFetchMore = true;
   private String allLogs = "";
 
   public LogAggregator(HdfsApi hdfsApi, String logFile) {
@@ -69,57 +65,62 @@ public class LogAggregator extends HiveActor {
     }
 
     if (message instanceof GetMoreLogs) {
-      try {
-        getMoreLogs();
-      } catch (SQLException e) {
-        LOG.warn("SQL Error while getting logs. Tried writing to: {}. 
Exception: {}", logFile, e.getMessage());
-      } catch (HdfsApiException e) {
-        LOG.warn("HDFS Error while writing logs to {}. Exception: {}", 
logFile, e.getMessage());
-
-      }
+      getMoreLogs();
     }
   }
 
   private void start(StartLogAggregation message) {
+
+    if (null != this.statement) {
+      LOG.debug("fetching logs for previous statement before switching to the 
new one. for {}", getSelf());
+      getMoreLogs();
+    }
     this.statement = message.getHiveStatement();
     parent = this.getSender();
-    hasStartedFetching = false;
-    shouldFetchMore = true;
     String logTitle = "Logs for Query '" + message.getStatement() + "'";
     String repeatSeperator = StringUtils.repeat("=", logTitle.length());
     allLogs += String.format("\n\n%s\n%s\n%s\n", repeatSeperator, logTitle, 
repeatSeperator);
 
-    if (!(moreLogsScheduler == null || moreLogsScheduler.isCancelled())) {
-      moreLogsScheduler.cancel();
+    if( null == moreLogsScheduler) {
+      setupScheduler();
     }
+  }
+
+  @VisibleForTesting
+  protected void setupScheduler() {
     this.moreLogsScheduler = getContext().system().scheduler().schedule(
-      Duration.Zero(), Duration.create(AGGREGATION_INTERVAL, 
TimeUnit.MILLISECONDS),
-      this.getSelf(), new GetMoreLogs(), getContext().dispatcher(), null);
+    Duration.Zero(), Duration.create(AGGREGATION_INTERVAL, 
TimeUnit.MILLISECONDS),
+    getSelf(), new GetMoreLogs(), getContext().dispatcher(), null);
   }
 
-  private void getMoreLogs() throws SQLException, HdfsApiException {
-    List<String> logs = statement.getQueryLog();
-    if (logs.size() > 0 && shouldFetchMore) {
-      allLogs = allLogs + "\n" + Joiner.on("\n").skipNulls().join(logs);
-      HdfsUtil.putStringToFile(hdfsApi, logFile, allLogs);
-      if(!statement.hasMoreLogs()) {
-        shouldFetchMore = false;
+
+  private void getMoreLogs() {
+    LOG.debug("fetching more logs for : {}", getSelf());
+    if ((null != this.statement)){
+      List<String> logs;
+      try{
+        logs = this.statement.getQueryLog();
+        LOG.debug("got more logs : {} for : {}", logs, getSelf());
+        if (logs.size() > 0){
+          this.allLogs = (this.allLogs + "\n" + 
Joiner.on("\n").skipNulls().join(logs));
+          HdfsUtil.putStringToFile(this.hdfsApi, this.logFile, this.allLogs);
+        }
       }
-    } else {
-      // Cancel the timer only when log fetching has been started
-      if(!shouldFetchMore) {
-        moreLogsScheduler.cancel();
-        parent.tell(new LogAggregationFinished(), ActorRef.noSender());
+      catch (Exception e){
+        LOG.error("Error occurred while fetching logs  for : {}", getSelf(), 
e);
       }
     }
   }
 
   @Override
   public void postStop() throws Exception {
+    LOG.debug("Stopping logaggregator after fetching the logs one last time : 
{}", getSelf());
+
+    getMoreLogs();
+
     if (moreLogsScheduler != null && !moreLogsScheduler.isCancelled()) {
       moreLogsScheduler.cancel();
     }
-
   }
 
 }
diff --git 
a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/StatementExecutor.java
 
b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/StatementExecutor.java
index d73a284..333991c 100644
--- 
a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/StatementExecutor.java
+++ 
b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/StatementExecutor.java
@@ -130,16 +130,11 @@ public class StatementExecutor extends HiveActor {
     logAggregator.tell(new StartLogAggregation(sqlStatement, statement), 
getSelf());
   }
 
-  private void stopLogAggregation() {
-    if (logAggregator != null) {
-      getContext().stop(logAggregator);
-    }
-    logAggregator = null;
-  }
-
   @Override
   public void postStop() throws Exception {
-    stopLogAggregation();
+    LOG.info("stopping StatementExecutor : {}", getSelf());
+    this.logAggregator = null;
+
   }
 
 

Reply via email to