This is an automated email from the ASF dual-hosted git repository.
nitiraj pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/branch-2.6 by this push:
new 0d7be53 AMBARI-24808 : Logs are not showing in Hiveview2 for hive
query. (nitirajrathore) (#2495)
0d7be53 is described below
commit 0d7be53e215270d19a0eb480214a43ee3a33e2dc
Author: nitirajrathore <[email protected]>
AuthorDate: Wed Oct 24 12:01:41 2018 +0530
AMBARI-24808 : Logs are not showing in Hiveview2 for hive query.
(nitirajrathore) (#2495)
---
.../ambari/view/hive2/actor/JdbcConnector.java | 8 ++-
.../ambari/view/hive2/actor/LogAggregator.java | 65 ++++++++++----------
.../ambari/view/hive2/actor/StatementExecutor.java | 10 +--
.../src/main/resources/ui/hive-web/package.json | 2 +-
.../src/main/resources/ui/hive-web/yarn.lock | 6 +-
.../ambari/view/hive20/actor/JdbcConnector.java | 20 +++---
.../ambari/view/hive20/actor/LogAggregator.java | 71 +++++++++++-----------
.../view/hive20/actor/StatementExecutor.java | 11 +---
8 files changed, 97 insertions(+), 96 deletions(-)
diff --git
a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/JdbcConnector.java
b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/JdbcConnector.java
index f7746d9..df76798 100644
---
a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/JdbcConnector.java
+++
b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/JdbcConnector.java
@@ -323,7 +323,7 @@ public class JdbcConnector extends HiveActor {
int index = statementsCount - statementQueue.size();
String statement = statementQueue.poll();
if (statementExecutor == null) {
- statementExecutor = getStatementExecutor();
+ statementExecutor = createStatementExecutor();
}
if (isAsync()) {
@@ -378,11 +378,13 @@ public class JdbcConnector extends HiveActor {
executing = true;
executionType = message.getType();
commandSender = getSender();
- statementExecutor = getStatementExecutor();
+ if(null == statementExecutor) {
+ statementExecutor = createStatementExecutor();
+ }
statementExecutor.tell(message, self());
}
- private ActorRef getStatementExecutor() {
+ private ActorRef createStatementExecutor() {
return getContext().actorOf(Props.create(StatementExecutor.class, hdfsApi,
storage, connectable.getConnection().get(), connectionDelegate)
.withDispatcher("akka.actor.result-dispatcher"),
"StatementExecutor:" + UUID.randomUUID().toString());
diff --git
a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/LogAggregator.java
b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/LogAggregator.java
index 69b4a56..aaba20e 100644
---
a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/LogAggregator.java
+++
b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/LogAggregator.java
@@ -21,6 +21,9 @@ package org.apache.ambari.view.hive2.actor;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Cancellable;
+import akka.event.Logging;
+import akka.event.LoggingAdapter;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import org.apache.ambari.view.hive2.actor.message.GetMoreLogs;
import org.apache.ambari.view.hive2.actor.message.HiveMessage;
@@ -30,6 +33,7 @@ import org.apache.ambari.view.utils.hdfs.HdfsApi;
import org.apache.ambari.view.utils.hdfs.HdfsApiException;
import org.apache.ambari.view.utils.hdfs.HdfsUtil;
import org.apache.commons.lang.StringUtils;
+import org.apache.hive.jdbc.ClosedOrCancelledStatementException;
import org.apache.hive.jdbc.HiveStatement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,7 +48,7 @@ import java.util.concurrent.TimeUnit;
*/
public class LogAggregator extends HiveActor {
- private final Logger LOG = LoggerFactory.getLogger(getClass());
+ private final LoggingAdapter LOG = Logging.getLogger(getContext().system(),
this);
public static final int AGGREGATION_INTERVAL = 5 * 1000;
private final HdfsApi hdfsApi;
@@ -53,8 +57,6 @@ public class LogAggregator extends HiveActor {
private Cancellable moreLogsScheduler;
private ActorRef parent;
- private boolean hasStartedFetching = false;
- private boolean shouldFetchMore = true;
private String allLogs = "";
public LogAggregator(HdfsApi hdfsApi, String logFile) {
@@ -70,56 +72,57 @@ public class LogAggregator extends HiveActor {
}
if (message instanceof GetMoreLogs) {
- try {
- getMoreLogs();
- } catch (SQLException e) {
- LOG.error("SQL Error while getting logs. Tried writing to: {}.
Exception: {}", logFile, e);
- } catch (HdfsApiException e) {
- LOG.warn("HDFS Error while getting writing logs to {}. Exception: {}",
logFile, e);
- }
+ getMoreLogs();
}
}
private void start(StartLogAggregation message) {
+ if (null != this.statement) {
+ LOG.debug("fetching logs for previous statement before switching to the
new one. for logFile: {} by actor: {}", logFile, getSelf());
+ getMoreLogs();
+ }
this.statement = message.getHiveStatement();
+ LOG.debug("Starting to fetch logs for logFile: {} by actor: {}", logFile,
getSelf());
parent = this.getSender();
- hasStartedFetching = false;
- shouldFetchMore = true;
String logTitle = "Logs for Query '" + message.getStatement() + "'";
String repeatSeperator = StringUtils.repeat("=", logTitle.length());
allLogs += String.format("\n\n%s\n%s\n%s\n", repeatSeperator, logTitle,
repeatSeperator);
- if (!(moreLogsScheduler == null || moreLogsScheduler.isCancelled())) {
- moreLogsScheduler.cancel();
+ if (moreLogsScheduler == null) {
+ setupScheduler();
}
+ }
+
+ @VisibleForTesting
+ private void setupScheduler() {
this.moreLogsScheduler = getContext().system().scheduler().schedule(
- Duration.Zero(), Duration.create(AGGREGATION_INTERVAL,
TimeUnit.MILLISECONDS),
- this.getSelf(), new GetMoreLogs(), getContext().dispatcher(), null);
+ Duration.Zero(), Duration.create(AGGREGATION_INTERVAL,
TimeUnit.MILLISECONDS),
+ this.getSelf(), new GetMoreLogs(), getContext().dispatcher(), null);
}
- private void getMoreLogs() throws SQLException, HdfsApiException {
- List<String> logs = statement.getQueryLog();
- if (logs.size() > 0 && shouldFetchMore) {
- allLogs = allLogs + "\n" + Joiner.on("\n").skipNulls().join(logs);
- HdfsUtil.putStringToFile(hdfsApi, logFile, allLogs);
- if(!statement.hasMoreLogs()) {
- shouldFetchMore = false;
- }
- } else {
- // Cancel the timer only when log fetching has been started
- if(!shouldFetchMore) {
- moreLogsScheduler.cancel();
- parent.tell(new LogAggregationFinished(), ActorRef.noSender());
+ private void getMoreLogs() {
+ LOG.debug("fetching more logs for : {}", logFile);
+ if (null != this.statement) {
+ List<String> logs;
+ try {
+ logs = this.statement.getQueryLog();
+ if (!statement.hasMoreLogs()) {
+ allLogs = allLogs + "\n" + Joiner.on("\n").skipNulls().join(logs);
+ HdfsUtil.putStringToFile(hdfsApi, logFile, allLogs);
+ // Cancel the timer only when log fetching has been started
+ }
+ } catch (Exception e) {
+ LOG.error("Error occurred while fetching logs for logFile : {}",
logFile, e);
}
}
}
@Override
public void postStop() throws Exception {
+ LOG.info("Stopping logaggregator after fetching the logs one last time :
for logFile {}", logFile);
+ getMoreLogs();
if (moreLogsScheduler != null && !moreLogsScheduler.isCancelled()) {
moreLogsScheduler.cancel();
}
-
}
-
}
diff --git
a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/StatementExecutor.java
b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/StatementExecutor.java
index 6cdee81..b067704 100644
---
a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/StatementExecutor.java
+++
b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/StatementExecutor.java
@@ -126,16 +126,10 @@ public class StatementExecutor extends HiveActor {
logAggregator.tell(new StartLogAggregation(sqlStatement, statement),
getSelf());
}
- private void stopLogAggregation() {
- if (logAggregator != null) {
- getContext().stop(logAggregator);
- }
- logAggregator = null;
- }
-
@Override
public void postStop() throws Exception {
- stopLogAggregation();
+ LOG.info("stopping StatementExecutor : {}", getSelf());
+ this.logAggregator = null;
}
private void getColumnMetaData(GetColumnMetadataJob message) {
diff --git
a/contrib/views/hive-next/src/main/resources/ui/hive-web/package.json
b/contrib/views/hive-next/src/main/resources/ui/hive-web/package.json
index 545cefd..aa23579 100644
--- a/contrib/views/hive-next/src/main/resources/ui/hive-web/package.json
+++ b/contrib/views/hive-next/src/main/resources/ui/hive-web/package.json
@@ -21,7 +21,7 @@
"license": "MIT",
"devDependencies": {
"body-parser": "^1.2.0",
- "bower": ">= 1.3.12",
+ "bower": "1.8.4",
"broccoli-asset-rev": "^2.0.0",
"ember-ajax": "^2.0.1",
"broccoli-sass": "^0.6.3",
diff --git a/contrib/views/hive-next/src/main/resources/ui/hive-web/yarn.lock
b/contrib/views/hive-next/src/main/resources/ui/hive-web/yarn.lock
index 372fe54..5e0a58b 100644
--- a/contrib/views/hive-next/src/main/resources/ui/hive-web/yarn.lock
+++ b/contrib/views/hive-next/src/main/resources/ui/hive-web/yarn.lock
@@ -483,9 +483,9 @@ [email protected]:
optimist "~0.6.0"
osenv "0.0.3"
-"bower@>= 1.3.12", bower@^1.3.12:
- version "1.8.0"
- resolved
"https://registry.yarnpkg.com/bower/-/bower-1.8.0.tgz#55dbebef0ad9155382d9e9d3e497c1372345b44a"
[email protected], bower@^1.3.12:
+ version "1.8.4"
+ resolved
"https://registry.yarnpkg.com/bower/-/bower-1.8.4.tgz#e7876a076deb8137f7d06525dc5e8c66db82f28a"
brace-expansion@^1.0.0:
version "1.1.7"
diff --git
a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/JdbcConnector.java
b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/JdbcConnector.java
index 5a744e2..9117ef5 100644
---
a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/JdbcConnector.java
+++
b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/JdbcConnector.java
@@ -354,7 +354,7 @@ public class JdbcConnector extends HiveActor {
int index = statementsCount - statementQueue.size();
String statement = statementQueue.poll();
if (statementExecutor == null) {
- statementExecutor = getStatementExecutor();
+ statementExecutor = createStatementExecutor();
}
if (isAsync()) {
@@ -409,7 +409,9 @@ public class JdbcConnector extends HiveActor {
executing = true;
executionType = message.getType();
commandSender = getSender();
- statementExecutor = getStatementExecutor();
+ if (statementExecutor == null) {
+ statementExecutor = createStatementExecutor();
+ }
statementExecutor.tell(message, self());
}
@@ -419,14 +421,18 @@ public class JdbcConnector extends HiveActor {
executing = true;
executionType = message.getType();
commandSender = getSender();
- statementExecutor = getStatementExecutor();
+ if (statementExecutor == null) {
+ statementExecutor = createStatementExecutor();
+ }
statementExecutor.tell(message, self());
}
- private ActorRef getStatementExecutor() {
- return getContext().actorOf(Props.create(StatementExecutor.class, hdfsApi,
storage, connectable.getConnection().get(), connectionDelegate)
- .withDispatcher("akka.actor.result-dispatcher"),
- "StatementExecutor:" + UUID.randomUUID().toString());
+ private ActorRef createStatementExecutor() {
+ ActorRef actorRef =
getContext().actorOf(Props.create(StatementExecutor.class, hdfsApi, storage,
connectable.getConnection().get(), connectionDelegate)
+ .withDispatcher("akka.actor.result-dispatcher"),
+ "StatementExecutor:" + UUID.randomUUID().toString());
+ LOG.debug("Created new statement executor: {}", actorRef);
+ return actorRef;
}
private boolean isAsync() {
diff --git
a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/LogAggregator.java
b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/LogAggregator.java
index 2c8a65d..d198965 100644
---
a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/LogAggregator.java
+++
b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/LogAggregator.java
@@ -20,21 +20,19 @@ package org.apache.ambari.view.hive20.actor;
import akka.actor.ActorRef;
import akka.actor.Cancellable;
+import akka.event.Logging;
+import akka.event.LoggingAdapter;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import org.apache.ambari.view.hive20.actor.message.GetMoreLogs;
import org.apache.ambari.view.hive20.actor.message.HiveMessage;
-import org.apache.ambari.view.hive20.actor.message.LogAggregationFinished;
import org.apache.ambari.view.hive20.actor.message.StartLogAggregation;
import org.apache.ambari.view.utils.hdfs.HdfsApi;
-import org.apache.ambari.view.utils.hdfs.HdfsApiException;
import org.apache.ambari.view.utils.hdfs.HdfsUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.hive.jdbc.HiveStatement;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import scala.concurrent.duration.Duration;
-import java.sql.SQLException;
import java.util.List;
import java.util.concurrent.TimeUnit;
@@ -43,7 +41,7 @@ import java.util.concurrent.TimeUnit;
*/
public class LogAggregator extends HiveActor {
- private final Logger LOG = LoggerFactory.getLogger(getClass());
+ private final LoggingAdapter LOG = Logging.getLogger(getContext().system(),
this);
public static final int AGGREGATION_INTERVAL = 5 * 1000;
private final HdfsApi hdfsApi;
@@ -52,8 +50,6 @@ public class LogAggregator extends HiveActor {
private Cancellable moreLogsScheduler;
private ActorRef parent;
- private boolean hasStartedFetching = false;
- private boolean shouldFetchMore = true;
private String allLogs = "";
public LogAggregator(HdfsApi hdfsApi, String logFile) {
@@ -69,57 +65,62 @@ public class LogAggregator extends HiveActor {
}
if (message instanceof GetMoreLogs) {
- try {
- getMoreLogs();
- } catch (SQLException e) {
- LOG.warn("SQL Error while getting logs. Tried writing to: {}.
Exception: {}", logFile, e.getMessage());
- } catch (HdfsApiException e) {
- LOG.warn("HDFS Error while writing logs to {}. Exception: {}",
logFile, e.getMessage());
-
- }
+ getMoreLogs();
}
}
private void start(StartLogAggregation message) {
+
+ if (null != this.statement) {
+ LOG.debug("fetching logs for previous statement before switching to the
new one. for {}", getSelf());
+ getMoreLogs();
+ }
this.statement = message.getHiveStatement();
parent = this.getSender();
- hasStartedFetching = false;
- shouldFetchMore = true;
String logTitle = "Logs for Query '" + message.getStatement() + "'";
String repeatSeperator = StringUtils.repeat("=", logTitle.length());
allLogs += String.format("\n\n%s\n%s\n%s\n", repeatSeperator, logTitle,
repeatSeperator);
- if (!(moreLogsScheduler == null || moreLogsScheduler.isCancelled())) {
- moreLogsScheduler.cancel();
+ if( null == moreLogsScheduler) {
+ setupScheduler();
}
+ }
+
+ @VisibleForTesting
+ protected void setupScheduler() {
this.moreLogsScheduler = getContext().system().scheduler().schedule(
- Duration.Zero(), Duration.create(AGGREGATION_INTERVAL,
TimeUnit.MILLISECONDS),
- this.getSelf(), new GetMoreLogs(), getContext().dispatcher(), null);
+ Duration.Zero(), Duration.create(AGGREGATION_INTERVAL,
TimeUnit.MILLISECONDS),
+ getSelf(), new GetMoreLogs(), getContext().dispatcher(), null);
}
- private void getMoreLogs() throws SQLException, HdfsApiException {
- List<String> logs = statement.getQueryLog();
- if (logs.size() > 0 && shouldFetchMore) {
- allLogs = allLogs + "\n" + Joiner.on("\n").skipNulls().join(logs);
- HdfsUtil.putStringToFile(hdfsApi, logFile, allLogs);
- if(!statement.hasMoreLogs()) {
- shouldFetchMore = false;
+
+ private void getMoreLogs() {
+ LOG.debug("fetching more logs for : {}", getSelf());
+ if ((null != this.statement)){
+ List<String> logs;
+ try{
+ logs = this.statement.getQueryLog();
+ LOG.debug("got more logs : {} for : {}", logs, getSelf());
+ if (logs.size() > 0){
+ this.allLogs = (this.allLogs + "\n" +
Joiner.on("\n").skipNulls().join(logs));
+ HdfsUtil.putStringToFile(this.hdfsApi, this.logFile, this.allLogs);
+ }
}
- } else {
- // Cancel the timer only when log fetching has been started
- if(!shouldFetchMore) {
- moreLogsScheduler.cancel();
- parent.tell(new LogAggregationFinished(), ActorRef.noSender());
+ catch (Exception e){
+ LOG.error("Error occurred while fetching logs for : {}", getSelf(),
e);
}
}
}
@Override
public void postStop() throws Exception {
+ LOG.debug("Stopping logaggregator after fetching the logs one last time :
{}", getSelf());
+
+ getMoreLogs();
+
if (moreLogsScheduler != null && !moreLogsScheduler.isCancelled()) {
moreLogsScheduler.cancel();
}
-
}
}
diff --git
a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/StatementExecutor.java
b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/StatementExecutor.java
index d73a284..333991c 100644
---
a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/StatementExecutor.java
+++
b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/StatementExecutor.java
@@ -130,16 +130,11 @@ public class StatementExecutor extends HiveActor {
logAggregator.tell(new StartLogAggregation(sqlStatement, statement),
getSelf());
}
- private void stopLogAggregation() {
- if (logAggregator != null) {
- getContext().stop(logAggregator);
- }
- logAggregator = null;
- }
-
@Override
public void postStop() throws Exception {
- stopLogAggregation();
+ LOG.info("stopping StatementExecutor : {}", getSelf());
+ this.logAggregator = null;
+
}