Repository: ambari
Updated Branches:
  refs/heads/branch-2.4 88fc02338 -> 92efc25eb


AMBARI-17269. INSERT and SELECT not working. jobid was used in actor names to 
make it unique but jobid is not a unique identifier across instances. (dipayanb)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/92efc25e
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/92efc25e
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/92efc25e

Branch: refs/heads/branch-2.4
Commit: 92efc25eb7063af57552e3061620252cde38228e
Parents: 88fc023
Author: Dipayan Bhowmick <[email protected]>
Authored: Thu Jun 16 15:22:42 2016 +0530
Committer: Dipayan Bhowmick <[email protected]>
Committed: Fri Jun 17 07:57:00 2016 +0530

----------------------------------------------------------------------
 .../ambari/view/hive2/ConnectionSystem.java     | 20 +++++----
 .../ambari/view/hive2/actor/JdbcConnector.java  |  4 +-
 .../view/hive2/actor/OperationController.java   | 45 ++++++++++++--------
 .../actor/message/GetColumnMetadataJob.java     | 18 ++++----
 .../view/hive2/actor/message/HiveJob.java       | 17 +-------
 .../hive2/actor/message/SQLStatementJob.java    |  8 ++--
 .../view/hive2/client/DDLDelegatorImpl.java     |  4 +-
 .../jobs/viewJobs/JobControllerImpl.java        |  2 +-
 8 files changed, 58 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/92efc25e/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/ConnectionSystem.java
----------------------------------------------------------------------
diff --git 
a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/ConnectionSystem.java
 
b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/ConnectionSystem.java
index f026ea6..860a49c 100644
--- 
a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/ConnectionSystem.java
+++ 
b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/ConnectionSystem.java
@@ -47,13 +47,14 @@ public class ConnectionSystem {
   private static Map<String, ActorRef> operationControllerMap = new 
HashMap<>();
 
   private ConnectionSystem() {
-    this.actorSystem = ActorSystem.create(ACTOR_SYSTEM_NAME);;
+    this.actorSystem = ActorSystem.create(ACTOR_SYSTEM_NAME);
+    ;
   }
 
   public static ConnectionSystem getInstance() {
-    if(instance == null) {
+    if (instance == null) {
       synchronized (lock) {
-        if(instance == null) {
+        if (instance == null) {
           instance = new ConnectionSystem();
         }
       }
@@ -61,10 +62,10 @@ public class ConnectionSystem {
     return instance;
   }
 
-  private ActorRef createOperationController() {
+  private ActorRef createOperationController(ViewContext context) {
     ActorRef deathWatch = actorSystem.actorOf(Props.create(DeathWatch.class));
     return actorSystem.actorOf(
-      Props.create(OperationController.class, actorSystem,deathWatch,
+      Props.create(OperationController.class, actorSystem, deathWatch, context,
         new ConnectionSupplier(), new DataStorageSupplier(), new 
HdfsApiSupplier()));
   }
 
@@ -74,17 +75,18 @@ public class ConnectionSystem {
 
   /**
    * Returns one operationController per View Instance
+   *
    * @param context
    * @return operationController Instance
    */
   public ActorRef getOperationController(ViewContext context) {
     String instanceName = context.getInstanceName();
     ActorRef ref = operationControllerMap.get(instanceName);
-    if(ref == null) {
+    if (ref == null) {
       synchronized (lock) {
         ref = operationControllerMap.get(instanceName);
-        if(ref == null) {
-          ref = createOperationController();
+        if (ref == null) {
+          ref = createOperationController(context);
           operationControllerMap.put(instanceName, ref);
         }
       }
@@ -93,7 +95,7 @@ public class ConnectionSystem {
   }
 
   public void shutdown() {
-    if(!actorSystem.isTerminated()) {
+    if (!actorSystem.isTerminated()) {
       actorSystem.shutdown();
     }
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/92efc25e/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/JdbcConnector.java
----------------------------------------------------------------------
diff --git 
a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/JdbcConnector.java
 
b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/JdbcConnector.java
index 1894739..8f21667 100644
--- 
a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/JdbcConnector.java
+++ 
b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/JdbcConnector.java
@@ -354,7 +354,9 @@ public class JdbcConnector extends HiveActor {
   }
 
   private ActorRef getStatementExecutor() {
-    return getContext().actorOf(Props.create(StatementExecutor.class, hdfsApi, 
storage, connectable.getConnection().get(), 
connectionDelegate).withDispatcher("akka.actor.result-dispatcher"), 
"StatementExecutor");
+    return getContext().actorOf(Props.create(StatementExecutor.class, hdfsApi, 
storage, connectable.getConnection().get(), connectionDelegate)
+      .withDispatcher("akka.actor.result-dispatcher"),
+      "StatementExecutor:" + UUID.randomUUID().toString());
   }
 
   private boolean isAsync() {

http://git-wip-us.apache.org/repos/asf/ambari/blob/92efc25e/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/OperationController.java
----------------------------------------------------------------------
diff --git 
a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/OperationController.java
 
b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/OperationController.java
index 0681d55..fa1ddd5 100644
--- 
a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/OperationController.java
+++ 
b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/OperationController.java
@@ -32,11 +32,9 @@ import org.apache.ambari.view.hive2.actor.message.HiveJob;
 import org.apache.ambari.view.hive2.actor.message.HiveMessage;
 import org.apache.ambari.view.hive2.actor.message.JobRejected;
 import org.apache.ambari.view.hive2.actor.message.RegisterActor;
-import org.apache.ambari.view.hive2.actor.message.ResultNotReady;
-import org.apache.ambari.view.hive2.actor.message.ResultReady;
 import org.apache.ambari.view.hive2.actor.message.SQLStatementJob;
-import org.apache.ambari.view.hive2.actor.message.job.AsyncExecutionFailed;
 import org.apache.ambari.view.hive2.actor.message.job.CancelJob;
+import org.apache.ambari.view.hive2.actor.message.job.FetchFailed;
 import org.apache.ambari.view.hive2.actor.message.lifecycle.DestroyConnector;
 import org.apache.ambari.view.hive2.actor.message.lifecycle.FreeConnector;
 import org.apache.ambari.view.hive2.internal.ContextSupplier;
@@ -94,13 +92,18 @@ public class OperationController extends HiveActor {
    */
   private final Map<String, Set<ActorRef>> syncBusyConnections;
 
+
+  private final ViewContext context;
+
   public OperationController(ActorSystem system,
                              ActorRef deathWatch,
+                             ViewContext context,
                              ContextSupplier<ConnectionDelegate> 
connectionSupplier,
                              ContextSupplier<Storage> storageSupplier,
                              ContextSupplier<Optional<HdfsApi>> 
hdfsApiSupplier) {
     this.system = system;
     this.deathWatch = deathWatch;
+    this.context = context;
     this.connectionSupplier = connectionSupplier;
     this.storageSupplier = storageSupplier;
     this.hdfsApiSupplier = hdfsApiSupplier;
@@ -151,7 +154,9 @@ public class OperationController extends HiveActor {
     if (actorRef != null) {
       actorRef.tell(message, sender());
     } else {
-      LOG.error("Failed to find a running job. Cannot cancel jobId: {}.", 
message.getJobId());
+      String msg = String.format("Cannot cancel job. Job with id: %s for 
instance: %s has either not started or has expired.", message.getJobId(), 
context.getInstanceName());
+      LOG.error(msg);
+      sender().tell(new FetchFailed(msg), self());
     }
   }
 
@@ -159,8 +164,12 @@ public class OperationController extends HiveActor {
     String jobId = message.getJobId();
     String username = message.getUsername();
     ActorRef actorRef = asyncBusyConnections.get(username).get(jobId);
-    if(actorRef != null) {
+    if (actorRef != null) {
       actorRef.tell(message, sender());
+    } else {
+      String msg = String.format("Cannot fetch error for job. Job with id: %s 
for instance: %s has either not started or has expired.", message.getJobId(), 
context.getInstanceName());
+      LOG.error(msg);
+      sender().tell(new FetchFailed(msg), self());
     }
   }
 
@@ -170,6 +179,10 @@ public class OperationController extends HiveActor {
     ActorRef actorRef = asyncBusyConnections.get(username).get(jobId);
     if (actorRef != null) {
       actorRef.tell(message, sender());
+    } else {
+      String msg = String.format("Cannot fetch result for job. Job with id: %s 
for instance: %s has either not started or has expired.", message.getJobId(), 
context.getInstanceName());
+      LOG.error(msg);
+      sender().tell(new FetchFailed(msg), self());
     }
   }
 
@@ -179,9 +192,8 @@ public class OperationController extends HiveActor {
     ActorRef subActor = null;
     // Check if there is available actors to process this
     subActor = getActorRefFromAsyncPool(username);
-    ViewContext viewContext = job.getViewContext();
     if (subActor == null) {
-      Optional<HdfsApi> hdfsApiOptional = hdfsApiSupplier.get(viewContext);
+      Optional<HdfsApi> hdfsApiOptional = hdfsApiSupplier.get(context);
       if (!hdfsApiOptional.isPresent()) {
         sender().tell(new JobRejected(username, jobId, "Failed to connect to 
Hive."), self());
         return;
@@ -189,10 +201,10 @@ public class OperationController extends HiveActor {
       HdfsApi hdfsApi = hdfsApiOptional.get();
 
       subActor = system.actorOf(
-        Props.create(JdbcConnector.class, viewContext, self(),
-          deathWatch, hdfsApi, connectionSupplier.get(viewContext),
-          
storageSupplier.get(viewContext)).withDispatcher("akka.actor.jdbc-connector-dispatcher"),
-        "jobId:" + jobId + ":asyncjdbcConnector");
+        Props.create(JdbcConnector.class, context, self(),
+          deathWatch, hdfsApi, connectionSupplier.get(context),
+          
storageSupplier.get(context)).withDispatcher("akka.actor.jdbc-connector-dispatcher"),
+        UUID.randomUUID().toString() + ":asyncjdbcConnector");
       deathWatch.tell(new RegisterActor(subActor), self());
     }
 
@@ -242,10 +254,9 @@ public class OperationController extends HiveActor {
     ActorRef subActor = null;
     // Check if there is available actors to process this
     subActor = getActorRefFromSyncPool(username);
-    ViewContext viewContext = job.getViewContext();
 
     if (subActor == null) {
-      Optional<HdfsApi> hdfsApiOptional = hdfsApiSupplier.get(viewContext);
+      Optional<HdfsApi> hdfsApiOptional = hdfsApiSupplier.get(context);
       if (!hdfsApiOptional.isPresent()) {
         sender().tell(new JobRejected(username, ExecuteJob.SYNC_JOB_MARKER, 
"Failed to connect to HDFS."), ActorRef.noSender());
         return;
@@ -253,10 +264,10 @@ public class OperationController extends HiveActor {
       HdfsApi hdfsApi = hdfsApiOptional.get();
 
       subActor = system.actorOf(
-        Props.create(JdbcConnector.class, viewContext, self(),
-          deathWatch, hdfsApi,  connectionSupplier.get(viewContext),
-          
storageSupplier.get(viewContext)).withDispatcher("akka.actor.jdbc-connector-dispatcher"),
-        UUID.randomUUID().toString() + ":SyncjdbcConnector");
+        Props.create(JdbcConnector.class, context, self(),
+          deathWatch, hdfsApi, connectionSupplier.get(context),
+          
storageSupplier.get(context)).withDispatcher("akka.actor.jdbc-connector-dispatcher"),
+        UUID.randomUUID().toString() + ":syncjdbcConnector");
       deathWatch.tell(new RegisterActor(subActor), self());
     }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/92efc25e/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/GetColumnMetadataJob.java
----------------------------------------------------------------------
diff --git 
a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/GetColumnMetadataJob.java
 
b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/GetColumnMetadataJob.java
index e285e36..9d2e9cc 100644
--- 
a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/GetColumnMetadataJob.java
+++ 
b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/GetColumnMetadataJob.java
@@ -18,33 +18,31 @@
 
 package org.apache.ambari.view.hive2.actor.message;
 
-import org.apache.ambari.view.ViewContext;
-
 public class GetColumnMetadataJob extends HiveJob {
   private final String schemaPattern;
   private final String tablePattern;
   private final String columnPattern;
 
-  public GetColumnMetadataJob(String username, ViewContext viewContext,
+  public GetColumnMetadataJob(String username,
                               String schemaPattern, String tablePattern, 
String columnPattern) {
-    super(Type.SYNC, username, viewContext);
+    super(Type.SYNC, username);
     this.schemaPattern = schemaPattern;
     this.tablePattern = tablePattern;
     this.columnPattern = columnPattern;
   }
 
-  public GetColumnMetadataJob(String username, ViewContext viewContext,
+  public GetColumnMetadataJob(String username,
                               String tablePattern, String columnPattern) {
-    this(username, viewContext, "*", tablePattern, columnPattern);
+    this(username, "*", tablePattern, columnPattern);
   }
 
-  public GetColumnMetadataJob(String username, ViewContext viewContext,
+  public GetColumnMetadataJob(String username,
                               String columnPattern) {
-    this(username, viewContext, "*", "*", columnPattern);
+    this(username, "*", "*", columnPattern);
   }
 
-  public GetColumnMetadataJob(String username, ViewContext viewContext) {
-    this(username, viewContext, "*", "*", "*");
+  public GetColumnMetadataJob(String username) {
+    this(username, "*", "*", "*");
   }
 
   public String getSchemaPattern() {

http://git-wip-us.apache.org/repos/asf/ambari/blob/92efc25e/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/HiveJob.java
----------------------------------------------------------------------
diff --git 
a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/HiveJob.java
 
b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/HiveJob.java
index ee3c1be..74400e2 100644
--- 
a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/HiveJob.java
+++ 
b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/HiveJob.java
@@ -18,23 +18,14 @@
 
 package org.apache.ambari.view.hive2.actor.message;
 
-import com.google.common.collect.ImmutableList;
-import org.apache.ambari.view.ViewContext;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-
 public abstract class HiveJob {
 
   private final String username;
   private final Type type;
-  private final ViewContext viewContext;
 
-  public HiveJob(Type type, String username,ViewContext viewContext) {
+  public HiveJob(Type type, String username) {
     this.type = type;
     this.username = username;
-    this.viewContext = viewContext;
   }
 
   public String getUsername() {
@@ -49,12 +40,6 @@ public abstract class HiveJob {
   }
 
 
-
-  public ViewContext getViewContext() {
-    return viewContext;
-  }
-
-
   public enum Type {
     SYNC,
     ASYNC

http://git-wip-us.apache.org/repos/asf/ambari/blob/92efc25e/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/SQLStatementJob.java
----------------------------------------------------------------------
diff --git 
a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/SQLStatementJob.java
 
b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/SQLStatementJob.java
index f5af068..03483d0 100644
--- 
a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/SQLStatementJob.java
+++ 
b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/SQLStatementJob.java
@@ -34,8 +34,8 @@ public class SQLStatementJob extends HiveJob {
   private final String jobId;
   private final String logFile;
 
-  public SQLStatementJob(Type type, String[] statements, String username, 
String jobId, String logFile, ViewContext viewContext) {
-    super(type, username, viewContext);
+  public SQLStatementJob(Type type, String[] statements, String username, 
String jobId, String logFile) {
+    super(type, username);
     this.statements = new String[statements.length];
     this.jobId = jobId;
     this.logFile = logFile;
@@ -43,8 +43,8 @@ public class SQLStatementJob extends HiveJob {
       this.statements[i] = clean(statements[i]);
     }
   }
-  public SQLStatementJob(Type type, String[] statements, String username, 
ViewContext viewContext) {
-    this(type, statements, username, null, null, viewContext);
+  public SQLStatementJob(Type type, String[] statements, String username) {
+    this(type, statements, username, null, null);
   }
 
   private String clean(String statement) {

http://git-wip-us.apache.org/repos/asf/ambari/blob/92efc25e/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/client/DDLDelegatorImpl.java
----------------------------------------------------------------------
diff --git 
a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/client/DDLDelegatorImpl.java
 
b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/client/DDLDelegatorImpl.java
index 72eca4c..0171626 100644
--- 
a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/client/DDLDelegatorImpl.java
+++ 
b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/client/DDLDelegatorImpl.java
@@ -144,7 +144,7 @@ public class DDLDelegatorImpl implements DDLDelegator {
 
   private Optional<Result> getRowsFromDB(ConnectionConfig config, String[] 
statements) {
     Connect connect = config.createConnectMessage();
-    HiveJob job = new SQLStatementJob(HiveJob.Type.SYNC, statements, 
config.getUsername(), context);
+    HiveJob job = new SQLStatementJob(HiveJob.Type.SYNC, statements, 
config.getUsername());
     ExecuteJob execute = new ExecuteJob(connect, job);
 
     LOG.info("Executing query: {}, for user: {}", 
getJoinedStatements(statements), job.getUsername());
@@ -154,7 +154,7 @@ public class DDLDelegatorImpl implements DDLDelegator {
 
   private Optional<Result> getTableDescription(ConnectionConfig config, String 
databasePattern, String tablePattern, String columnPattern) {
     Connect connect = config.createConnectMessage();
-    HiveJob job = new GetColumnMetadataJob(config.getUsername(), context, 
databasePattern, tablePattern, columnPattern);
+    HiveJob job = new GetColumnMetadataJob(config.getUsername(), 
databasePattern, tablePattern, columnPattern);
     ExecuteJob execute = new ExecuteJob(connect, job);
 
     LOG.info("Executing query to fetch the column description for dbPattern: 
{}, tablePattern: {}, columnPattern: {}, for user: {}",

http://git-wip-us.apache.org/repos/asf/ambari/blob/92efc25e/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/viewJobs/JobControllerImpl.java
----------------------------------------------------------------------
diff --git 
a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/viewJobs/JobControllerImpl.java
 
b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/viewJobs/JobControllerImpl.java
index aab6e71..e94d727 100644
--- 
a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/viewJobs/JobControllerImpl.java
+++ 
b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/viewJobs/JobControllerImpl.java
@@ -107,7 +107,7 @@ public class JobControllerImpl implements JobController, 
ModifyNotificationDeleg
         String query = getQueryForJob();
         ConnectionSystem system = ConnectionSystem.getInstance();
         AsyncJobRunner asyncJobRunner = new AsyncJobRunnerImpl(context, 
system.getOperationController(context), system.getActorSystem());
-        SQLStatementJob asyncJob = new SQLStatementJob(HiveJob.Type.ASYNC, 
getStatements(jobDatabase, query), context.getUsername(), job.getId(), 
job.getLogFile(), context);
+        SQLStatementJob asyncJob = new SQLStatementJob(HiveJob.Type.ASYNC, 
getStatements(jobDatabase, query), context.getUsername(), job.getId(), 
job.getLogFile());
         asyncJobRunner.submitJob(getHiveConnectionConfig(), asyncJob, job);
 
     }

Reply via email to