Repository: ambari Updated Branches: refs/heads/branch-2.4 88fc02338 -> 92efc25eb
AMBARI-17269. INSERT and SELECT not working. jobid was used in actor names to make it unique but jobid is not a unique identifier across instances. (dipayanb) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/92efc25e Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/92efc25e Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/92efc25e Branch: refs/heads/branch-2.4 Commit: 92efc25eb7063af57552e3061620252cde38228e Parents: 88fc023 Author: Dipayan Bhowmick <[email protected]> Authored: Thu Jun 16 15:22:42 2016 +0530 Committer: Dipayan Bhowmick <[email protected]> Committed: Fri Jun 17 07:57:00 2016 +0530 ---------------------------------------------------------------------- .../ambari/view/hive2/ConnectionSystem.java | 20 +++++---- .../ambari/view/hive2/actor/JdbcConnector.java | 4 +- .../view/hive2/actor/OperationController.java | 45 ++++++++++++-------- .../actor/message/GetColumnMetadataJob.java | 18 ++++---- .../view/hive2/actor/message/HiveJob.java | 17 +------- .../hive2/actor/message/SQLStatementJob.java | 8 ++-- .../view/hive2/client/DDLDelegatorImpl.java | 4 +- .../jobs/viewJobs/JobControllerImpl.java | 2 +- 8 files changed, 58 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/92efc25e/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/ConnectionSystem.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/ConnectionSystem.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/ConnectionSystem.java index f026ea6..860a49c 100644 --- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/ConnectionSystem.java +++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/ConnectionSystem.java @@ -47,13 +47,14 @@ public class ConnectionSystem { private static Map<String, ActorRef> operationControllerMap = new HashMap<>(); private ConnectionSystem() { - this.actorSystem = ActorSystem.create(ACTOR_SYSTEM_NAME);; + this.actorSystem = ActorSystem.create(ACTOR_SYSTEM_NAME); + ; } public static ConnectionSystem getInstance() { - if(instance == null) { + if (instance == null) { synchronized (lock) { - if(instance == null) { + if (instance == null) { instance = new ConnectionSystem(); } } @@ -61,10 +62,10 @@ public class ConnectionSystem { return instance; } - private ActorRef createOperationController() { + private ActorRef createOperationController(ViewContext context) { ActorRef deathWatch = actorSystem.actorOf(Props.create(DeathWatch.class)); return actorSystem.actorOf( - Props.create(OperationController.class, actorSystem,deathWatch, + Props.create(OperationController.class, actorSystem, deathWatch, context, new ConnectionSupplier(), new DataStorageSupplier(), new HdfsApiSupplier())); } @@ -74,17 +75,18 @@ public class ConnectionSystem { /** * Returns one operationController per View Instance + * * @param context * @return operationController Instance */ public ActorRef getOperationController(ViewContext context) { String instanceName = context.getInstanceName(); ActorRef ref = operationControllerMap.get(instanceName); - if(ref == null) { + if (ref == null) { synchronized (lock) { ref = operationControllerMap.get(instanceName); - if(ref == null) { - ref = createOperationController(); + if (ref == null) { + ref = createOperationController(context); operationControllerMap.put(instanceName, ref); } } @@ -93,7 +95,7 @@ public class ConnectionSystem { } public void shutdown() { - if(!actorSystem.isTerminated()) { + if (!actorSystem.isTerminated()) { actorSystem.shutdown(); } } http://git-wip-us.apache.org/repos/asf/ambari/blob/92efc25e/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/JdbcConnector.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/JdbcConnector.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/JdbcConnector.java index 1894739..8f21667 100644 --- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/JdbcConnector.java +++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/JdbcConnector.java @@ -354,7 +354,9 @@ public class JdbcConnector extends HiveActor { } private ActorRef getStatementExecutor() { - return getContext().actorOf(Props.create(StatementExecutor.class, hdfsApi, storage, connectable.getConnection().get(), connectionDelegate).withDispatcher("akka.actor.result-dispatcher"), "StatementExecutor"); + return getContext().actorOf(Props.create(StatementExecutor.class, hdfsApi, storage, connectable.getConnection().get(), connectionDelegate) + .withDispatcher("akka.actor.result-dispatcher"), + "StatementExecutor:" + UUID.randomUUID().toString()); } private boolean isAsync() { http://git-wip-us.apache.org/repos/asf/ambari/blob/92efc25e/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/OperationController.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/OperationController.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/OperationController.java index 0681d55..fa1ddd5 100644 --- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/OperationController.java +++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/OperationController.java @@ -32,11 +32,9 @@ import org.apache.ambari.view.hive2.actor.message.HiveJob; import org.apache.ambari.view.hive2.actor.message.HiveMessage; import org.apache.ambari.view.hive2.actor.message.JobRejected; import org.apache.ambari.view.hive2.actor.message.RegisterActor; -import org.apache.ambari.view.hive2.actor.message.ResultNotReady; -import org.apache.ambari.view.hive2.actor.message.ResultReady; import org.apache.ambari.view.hive2.actor.message.SQLStatementJob; -import org.apache.ambari.view.hive2.actor.message.job.AsyncExecutionFailed; import org.apache.ambari.view.hive2.actor.message.job.CancelJob; +import org.apache.ambari.view.hive2.actor.message.job.FetchFailed; import org.apache.ambari.view.hive2.actor.message.lifecycle.DestroyConnector; import org.apache.ambari.view.hive2.actor.message.lifecycle.FreeConnector; import org.apache.ambari.view.hive2.internal.ContextSupplier; @@ -94,13 +92,18 @@ public class OperationController extends HiveActor { */ private final Map<String, Set<ActorRef>> syncBusyConnections; + + private final ViewContext context; + public OperationController(ActorSystem system, ActorRef deathWatch, + ViewContext context, ContextSupplier<ConnectionDelegate> connectionSupplier, ContextSupplier<Storage> storageSupplier, ContextSupplier<Optional<HdfsApi>> hdfsApiSupplier) { this.system = system; this.deathWatch = deathWatch; + this.context = context; this.connectionSupplier = connectionSupplier; this.storageSupplier = storageSupplier; this.hdfsApiSupplier = hdfsApiSupplier; @@ -151,7 +154,9 @@ public class OperationController extends HiveActor { if (actorRef != null) { actorRef.tell(message, sender()); } else { - LOG.error("Failed to find a running job. Cannot cancel jobId: {}.", message.getJobId()); + String msg = String.format("Cannot cancel job. Job with id: %s for instance: %s has either not started or has expired.", message.getJobId(), context.getInstanceName()); + LOG.error(msg); + sender().tell(new FetchFailed(msg), self()); } } @@ -159,8 +164,12 @@ public class OperationController extends HiveActor { String jobId = message.getJobId(); String username = message.getUsername(); ActorRef actorRef = asyncBusyConnections.get(username).get(jobId); - if(actorRef != null) { + if (actorRef != null) { actorRef.tell(message, sender()); + } else { + String msg = String.format("Cannot fetch error for job. Job with id: %s for instance: %s has either not started or has expired.", message.getJobId(), context.getInstanceName()); + LOG.error(msg); + sender().tell(new FetchFailed(msg), self()); } } @@ -170,6 +179,10 @@ public class OperationController extends HiveActor { ActorRef actorRef = asyncBusyConnections.get(username).get(jobId); if (actorRef != null) { actorRef.tell(message, sender()); + } else { + String msg = String.format("Cannot fetch result for job. Job with id: %s for instance: %s has either not started or has expired.", message.getJobId(), context.getInstanceName()); + LOG.error(msg); + sender().tell(new FetchFailed(msg), self()); } } @@ -179,9 +192,8 @@ public class OperationController extends HiveActor { ActorRef subActor = null; // Check if there is available actors to process this subActor = getActorRefFromAsyncPool(username); - ViewContext viewContext = job.getViewContext(); if (subActor == null) { - Optional<HdfsApi> hdfsApiOptional = hdfsApiSupplier.get(viewContext); + Optional<HdfsApi> hdfsApiOptional = hdfsApiSupplier.get(context); if (!hdfsApiOptional.isPresent()) { sender().tell(new JobRejected(username, jobId, "Failed to connect to Hive."), self()); return; @@ -189,10 +201,10 @@ public class OperationController extends HiveActor { HdfsApi hdfsApi = hdfsApiOptional.get(); subActor = system.actorOf( - Props.create(JdbcConnector.class, viewContext, self(), - deathWatch, hdfsApi, connectionSupplier.get(viewContext), - storageSupplier.get(viewContext)).withDispatcher("akka.actor.jdbc-connector-dispatcher"), - "jobId:" + jobId + ":asyncjdbcConnector"); + Props.create(JdbcConnector.class, context, self(), + deathWatch, hdfsApi, connectionSupplier.get(context), + storageSupplier.get(context)).withDispatcher("akka.actor.jdbc-connector-dispatcher"), + UUID.randomUUID().toString() + ":asyncjdbcConnector"); deathWatch.tell(new RegisterActor(subActor), self()); } @@ -242,10 +254,9 @@ public class OperationController extends HiveActor { ActorRef subActor = null; // Check if there is available actors to process this subActor = getActorRefFromSyncPool(username); - ViewContext viewContext = job.getViewContext(); if (subActor == null) { - Optional<HdfsApi> hdfsApiOptional = hdfsApiSupplier.get(viewContext); + Optional<HdfsApi> hdfsApiOptional = hdfsApiSupplier.get(context); if (!hdfsApiOptional.isPresent()) { sender().tell(new JobRejected(username, ExecuteJob.SYNC_JOB_MARKER, "Failed to connect to HDFS."), ActorRef.noSender()); return; @@ -253,10 +264,10 @@ public class OperationController extends HiveActor { HdfsApi hdfsApi = hdfsApiOptional.get(); subActor = system.actorOf( - Props.create(JdbcConnector.class, viewContext, self(), - deathWatch, hdfsApi, connectionSupplier.get(viewContext), - storageSupplier.get(viewContext)).withDispatcher("akka.actor.jdbc-connector-dispatcher"), - UUID.randomUUID().toString() + ":SyncjdbcConnector"); + Props.create(JdbcConnector.class, context, self(), + deathWatch, hdfsApi, connectionSupplier.get(context), + storageSupplier.get(context)).withDispatcher("akka.actor.jdbc-connector-dispatcher"), + UUID.randomUUID().toString() + ":syncjdbcConnector"); deathWatch.tell(new RegisterActor(subActor), self()); } http://git-wip-us.apache.org/repos/asf/ambari/blob/92efc25e/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/GetColumnMetadataJob.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/GetColumnMetadataJob.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/GetColumnMetadataJob.java index e285e36..9d2e9cc 100644 --- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/GetColumnMetadataJob.java +++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/GetColumnMetadataJob.java @@ -18,33 +18,31 @@ package org.apache.ambari.view.hive2.actor.message; -import org.apache.ambari.view.ViewContext; - public class GetColumnMetadataJob extends HiveJob { private final String schemaPattern; private final String tablePattern; private final String columnPattern; - public GetColumnMetadataJob(String username, ViewContext viewContext, + public GetColumnMetadataJob(String username, String schemaPattern, String tablePattern, String columnPattern) { - super(Type.SYNC, username, viewContext); + super(Type.SYNC, username); this.schemaPattern = schemaPattern; this.tablePattern = tablePattern; this.columnPattern = columnPattern; } - public GetColumnMetadataJob(String username, ViewContext viewContext, + public GetColumnMetadataJob(String username, String tablePattern, String columnPattern) { - this(username, viewContext, "*", tablePattern, columnPattern); + this(username, "*", tablePattern, columnPattern); } - public GetColumnMetadataJob(String username, ViewContext viewContext, + public GetColumnMetadataJob(String username, String columnPattern) { - this(username, viewContext, "*", "*", columnPattern); + this(username, "*", "*", columnPattern); } - public GetColumnMetadataJob(String username, ViewContext viewContext) { - this(username, viewContext, "*", "*", "*"); + public GetColumnMetadataJob(String username) { + this(username, "*", "*", "*"); } public String getSchemaPattern() { http://git-wip-us.apache.org/repos/asf/ambari/blob/92efc25e/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/HiveJob.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/HiveJob.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/HiveJob.java index ee3c1be..74400e2 100644 --- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/HiveJob.java +++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/HiveJob.java @@ -18,23 +18,14 @@ package org.apache.ambari.view.hive2.actor.message; -import com.google.common.collect.ImmutableList; -import org.apache.ambari.view.ViewContext; - -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; - public abstract class HiveJob { private final String username; private final Type type; - private final ViewContext viewContext; - public HiveJob(Type type, String username,ViewContext viewContext) { + public HiveJob(Type type, String username) { this.type = type; this.username = username; - this.viewContext = viewContext; } public String getUsername() { @@ -49,12 +40,6 @@ public abstract class HiveJob { } - - public ViewContext getViewContext() { - return viewContext; - } - - public enum Type { SYNC, ASYNC http://git-wip-us.apache.org/repos/asf/ambari/blob/92efc25e/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/SQLStatementJob.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/SQLStatementJob.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/SQLStatementJob.java index f5af068..03483d0 100644 --- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/SQLStatementJob.java +++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/SQLStatementJob.java @@ -34,8 +34,8 @@ public class SQLStatementJob extends HiveJob { private final String jobId; private final String logFile; - public SQLStatementJob(Type type, String[] statements, String username, String jobId, String logFile, ViewContext viewContext) { - super(type, username, viewContext); + public SQLStatementJob(Type type, String[] statements, String username, String jobId, String logFile) { + super(type, username); this.statements = new String[statements.length]; this.jobId = jobId; this.logFile = logFile; @@ -43,8 +43,8 @@ public class SQLStatementJob extends HiveJob { this.statements[i] = clean(statements[i]); } } - public SQLStatementJob(Type type, String[] statements, String username, ViewContext viewContext) { - this(type, statements, username, null, null, viewContext); + public SQLStatementJob(Type type, String[] statements, String username) { + this(type, statements, username, null, null); } private String clean(String statement) { http://git-wip-us.apache.org/repos/asf/ambari/blob/92efc25e/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/client/DDLDelegatorImpl.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/client/DDLDelegatorImpl.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/client/DDLDelegatorImpl.java index 72eca4c..0171626 100644 --- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/client/DDLDelegatorImpl.java +++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/client/DDLDelegatorImpl.java @@ -144,7 +144,7 @@ public class DDLDelegatorImpl implements DDLDelegator { private Optional<Result> getRowsFromDB(ConnectionConfig config, String[] statements) { Connect connect = config.createConnectMessage(); - HiveJob job = new SQLStatementJob(HiveJob.Type.SYNC, statements, config.getUsername(), context); + HiveJob job = new SQLStatementJob(HiveJob.Type.SYNC, statements, config.getUsername()); ExecuteJob execute = new ExecuteJob(connect, job); LOG.info("Executing query: {}, for user: {}", getJoinedStatements(statements), job.getUsername()); @@ -154,7 +154,7 @@ public class DDLDelegatorImpl implements DDLDelegator { private Optional<Result> getTableDescription(ConnectionConfig config, String databasePattern, String tablePattern, String columnPattern) { Connect connect = config.createConnectMessage(); - HiveJob job = new GetColumnMetadataJob(config.getUsername(), context, databasePattern, tablePattern, columnPattern); + HiveJob job = new GetColumnMetadataJob(config.getUsername(), databasePattern, tablePattern, columnPattern); ExecuteJob execute = new ExecuteJob(connect, job); LOG.info("Executing query to fetch the column description for dbPattern: {}, tablePattern: {}, columnPattern: {}, for user: {}", http://git-wip-us.apache.org/repos/asf/ambari/blob/92efc25e/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/viewJobs/JobControllerImpl.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/viewJobs/JobControllerImpl.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/viewJobs/JobControllerImpl.java index aab6e71..e94d727 100644 --- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/viewJobs/JobControllerImpl.java +++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/viewJobs/JobControllerImpl.java @@ -107,7 +107,7 @@ public class JobControllerImpl implements JobController, ModifyNotificationDeleg String query = getQueryForJob(); ConnectionSystem system = ConnectionSystem.getInstance(); AsyncJobRunner asyncJobRunner = new AsyncJobRunnerImpl(context, system.getOperationController(context), system.getActorSystem()); - SQLStatementJob asyncJob = new SQLStatementJob(HiveJob.Type.ASYNC, getStatements(jobDatabase, query), context.getUsername(), job.getId(), job.getLogFile(), context); + SQLStatementJob asyncJob = new SQLStatementJob(HiveJob.Type.ASYNC, getStatements(jobDatabase, query), context.getUsername(), job.getId(), job.getLogFile()); asyncJobRunner.submitJob(getHiveConnectionConfig(), asyncJob, job); }
