Repository: ambari Updated Branches: refs/heads/branch-2.4 59e422ec4 -> e673a6ff7
AMBARI-18003. Hive view 1.5.0 shows error for previous invalid queries in the logs of any subsequent query. (dipayanb) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/e673a6ff Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/e673a6ff Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/e673a6ff Branch: refs/heads/branch-2.4 Commit: e673a6ff7817ab11b15c0777b4462105a7fedae7 Parents: 59e422e Author: Dipayan Bhowmick <[email protected]> Authored: Wed Aug 3 16:40:37 2016 +0530 Committer: Dipayan Bhowmick <[email protected]> Committed: Thu Aug 4 11:37:56 2016 +0530 ---------------------------------------------------------------------- .../apache/ambari/view/hive2/actor/JdbcConnector.java | 12 ++++++++++++ .../ambari/view/hive2/actor/StatementExecutor.java | 4 ++++ .../ambari/view/hive2/actor/YarnAtsGUIDFetcher.java | 8 ++++++++ 3 files changed, 24 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/e673a6ff/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/JdbcConnector.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/JdbcConnector.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/JdbcConnector.java index 8f21667..8d64c9b 100644 --- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/JdbcConnector.java +++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/JdbcConnector.java @@ -270,6 +270,7 @@ public class JdbcConnector extends HiveActor { private void processResult(Optional<ResultSet> resultSetOptional) { executing = false; + LOG.info("Finished processing SQL statements for Job id : {}", jobId.or("SYNC JOB")); if (isAsync() && jobId.isPresent()) { updateJobStatus(jobId.get(), Job.JOB_STATE_FINISHED); } @@ -316,6 +317,8 @@ public class JdbcConnector extends HiveActor { executionType = message.getType(); commandSender = getSender(); + resetToInitialState(); + if (!checkConnection()) return; for (String statement : message.getStatements()) { @@ -346,6 +349,7 @@ public class JdbcConnector extends HiveActor { private void runGetMetaData(GetColumnMetadataJob message) { if (!checkConnection()) return; + resetToInitialState(); executing = true; executionType = message.getType(); commandSender = getSender(); @@ -417,6 +421,7 @@ public class JdbcConnector extends HiveActor { JobImpl job = storage.load(JobImpl.class, jobid); job.setStatus(status); storage.store(JobImpl.class, job); + LOG.info("Stored job status for Job id: {} as '{}'", jobid, status); } catch (ItemNotFound itemNotFound) { // Cannot do anything } @@ -511,6 +516,13 @@ public class JdbcConnector extends HiveActor { } } + private void resetToInitialState() { + isFailure = false; + failure = null; + resultSetIterator = null; + isCancelCalled = false; + } + @Override public void postStop() throws Exception { stopInactivityScheduler(); http://git-wip-us.apache.org/repos/asf/ambari/blob/e673a6ff/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/StatementExecutor.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/StatementExecutor.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/StatementExecutor.java index 6d77180..5b4f76c 100644 --- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/StatementExecutor.java +++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/StatementExecutor.java @@ -84,7 +84,9 @@ public class StatementExecutor extends HiveActor { if (message.shouldStartGUIDFetch() && message.getJobId().isPresent()) { startGUIDFetch(statement, message.getJobId().get()); } + LOG.info("Statement executor is executing statement: {}, Statement id: {}, JobId: {}", message.getStatement(), message.getId(), message.getJobId().or("SYNC JOB")); Optional<ResultSet> resultSetOptional = connectionDelegate.execute(message.getStatement()); + LOG.info("Finished executing statement: {}, Statement id: {}, JobId: {}", message.getStatement(), message.getId(), message.getJobId().or("SYNC JOB")); if (resultSetOptional.isPresent()) { sender().tell(new ResultInformation(message.getId(), resultSetOptional.get()), self()); @@ -105,6 +107,7 @@ public class StatementExecutor extends HiveActor { guidFetcher = getContext().actorOf(Props.create(YarnAtsGUIDFetcher.class, storage) .withDispatcher("akka.actor.misc-dispatcher"), "YarnAtsGUIDFetcher:" + UUID.randomUUID().toString()); } + LOG.info("Fetching guid for Job Id: {}", jobId); guidFetcher.tell(new UpdateYarnAtsGuid(statement, jobId), self()); } @@ -121,6 +124,7 @@ public class StatementExecutor extends HiveActor { Props.create(LogAggregator.class, hdfsApi, statement, logFile) .withDispatcher("akka.actor.misc-dispatcher"), "LogAggregator:" + UUID.randomUUID().toString()); } + LOG.info("Fetching query logs for statement: {}", sqlStatement); logAggregator.tell(new StartLogAggregation(sqlStatement), getSelf()); } http://git-wip-us.apache.org/repos/asf/ambari/blob/e673a6ff/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/YarnAtsGUIDFetcher.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/YarnAtsGUIDFetcher.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/YarnAtsGUIDFetcher.java index 40b84c4..4bcb815 100644 --- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/YarnAtsGUIDFetcher.java +++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/YarnAtsGUIDFetcher.java @@ -24,6 +24,8 @@ import org.apache.ambari.view.hive2.persistence.Storage; import org.apache.ambari.view.hive2.persistence.utils.ItemNotFound; import org.apache.ambari.view.hive2.resources.jobs.viewJobs.JobImpl; import org.apache.hive.jdbc.HiveStatement; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.concurrent.duration.Duration; import java.util.concurrent.TimeUnit; @@ -34,6 +36,8 @@ import java.util.concurrent.TimeUnit; */ public class YarnAtsGUIDFetcher extends HiveActor { + private final Logger LOG = LoggerFactory.getLogger(getClass()); + private final Storage storage; public YarnAtsGUIDFetcher(Storage storage) { @@ -53,8 +57,11 @@ public class YarnAtsGUIDFetcher extends HiveActor { String jobId = message.getJobId(); String yarnAtsGuid = statement.getYarnATSGuid(); + LOG.info("Fetched guid: {}, for job id: {}", yarnAtsGuid, jobId); + // If ATS GUID is not yet generated, we will retry after 1 second if(yarnAtsGuid == null) { + LOG.info("Retrying to fetch guid"); getContext().system().scheduler() .scheduleOnce(Duration.create(1, TimeUnit.SECONDS), getSelf(), message, getContext().dispatcher(), null); } else { @@ -62,6 +69,7 @@ public class YarnAtsGUIDFetcher extends HiveActor { JobImpl job = storage.load(JobImpl.class, jobId); job.setGuid(yarnAtsGuid); storage.store(JobImpl.class, job); + LOG.info("Stored guid: {} for job id: {} in database", yarnAtsGuid, jobId); } catch (ItemNotFound itemNotFound) { // Cannot do anything if the job is not present }
