Repository: asterixdb
Updated Branches:
  refs/heads/master 51e381277 -> da7e8a16d


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/da7e8a16/hyracks-fullstack/hyracks/hyracks-examples/hyracks-shutdown-test/src/test/java/org/apache/hyracks/examples/shutdown/test/ClusterShutdownIT.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-shutdown-test/src/test/java/org/apache/hyracks/examples/shutdown/test/ClusterShutdownIT.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-shutdown-test/src/test/java/org/apache/hyracks/examples/shutdown/test/ClusterShutdownIT.java
index 7c85d5a..83585d1 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-shutdown-test/src/test/java/org/apache/hyracks/examples/shutdown/test/ClusterShutdownIT.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-shutdown-test/src/test/java/org/apache/hyracks/examples/shutdown/test/ClusterShutdownIT.java
@@ -20,9 +20,9 @@ package org.apache.hyracks.examples.shutdown.test;
 
 import java.net.ServerSocket;
 
-import org.apache.hyracks.api.client.HyracksConnection;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.ipc.exceptions.IPCException;
+import org.apache.hyracks.ipc.impl.HyracksConnection;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.junit.Rule;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/da7e8a16/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/pom.xml
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/pom.xml 
b/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/pom.xml
index f412499..c00ffc1 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/pom.xml
@@ -62,6 +62,11 @@
       <artifactId>hyracks-data-std</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-ipc</artifactId>
+      <version>${project.version}</version>
+    </dependency>
   </dependencies>
   <build>
     <plugins>

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/da7e8a16/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java
index 23a6be0..2004be9 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java
@@ -21,7 +21,6 @@ package org.apache.hyracks.examples.text.client;
 import java.io.File;
 import java.util.EnumSet;
 
-import org.apache.hyracks.api.client.HyracksConnection;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.constraints.PartitionConstraintHelper;
 import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
@@ -63,6 +62,7 @@ import 
org.apache.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOpera
 import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.sort.InMemorySortOperatorDescriptor;
 import org.apache.hyracks.examples.text.WordTupleParserFactory;
+import org.apache.hyracks.ipc.impl.HyracksConnection;
 import org.kohsuke.args4j.CmdLineParser;
 import org.kohsuke.args4j.Option;
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/da7e8a16/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/pom.xml
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/pom.xml 
b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/pom.xml
index e1e2006..31bcf2e 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/pom.xml
@@ -56,6 +56,11 @@
       <artifactId>hyracks-api</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-ipc</artifactId>
+      <version>${project.version}</version>
+    </dependency>
   </dependencies>
   <build>
     <plugins>

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/da7e8a16/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Groupby.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Groupby.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Groupby.java
index 80c4f88..cb9307b 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Groupby.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Groupby.java
@@ -23,7 +23,6 @@ import static 
org.apache.hyracks.examples.tpch.client.Common.lineitemDesc;
 import static 
org.apache.hyracks.examples.tpch.client.Common.lineitemParserFactories;
 import static org.apache.hyracks.examples.tpch.client.Common.parseFileSplits;
 
-import org.apache.hyracks.api.client.HyracksConnection;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -58,6 +57,7 @@ import 
org.apache.hyracks.dataflow.std.group.aggregators.IntSumFieldAggregatorFa
 import 
org.apache.hyracks.dataflow.std.group.aggregators.MultiFieldsAggregatorFactory;
 import 
org.apache.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;
 import 
org.apache.hyracks.dataflow.std.group.sort.SortGroupByOperatorDescriptor;
+import org.apache.hyracks.ipc.impl.HyracksConnection;
 import org.kohsuke.args4j.CmdLineParser;
 import org.kohsuke.args4j.Option;
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/da7e8a16/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java
index c3d0df1..a0d40ee 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java
@@ -25,7 +25,6 @@ import static 
org.apache.hyracks.examples.tpch.client.Common.parseFileSplits;
 
 import java.util.EnumSet;
 
-import org.apache.hyracks.api.client.HyracksConnection;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.constraints.PartitionConstraintHelper;
 import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
@@ -65,6 +64,7 @@ import 
org.apache.hyracks.dataflow.std.join.InMemoryHashJoinOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.join.JoinComparatorFactory;
 import org.apache.hyracks.dataflow.std.join.NestedLoopJoinOperatorDescriptor;
 import 
org.apache.hyracks.dataflow.std.join.OptimizedHybridHashJoinOperatorDescriptor;
+import org.apache.hyracks.ipc.impl.HyracksConnection;
 import org.kohsuke.args4j.CmdLineParser;
 import org.kohsuke.args4j.Option;
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/da7e8a16/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Sort.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Sort.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Sort.java
index 5043974..08d1031 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Sort.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Sort.java
@@ -19,11 +19,13 @@
 
 package org.apache.hyracks.examples.tpch.client;
 
-import static org.apache.hyracks.examples.tpch.client.Common.*;
+import static 
org.apache.hyracks.examples.tpch.client.Common.createPartitionConstraint;
+import static 
org.apache.hyracks.examples.tpch.client.Common.orderParserFactories;
+import static org.apache.hyracks.examples.tpch.client.Common.ordersDesc;
+import static org.apache.hyracks.examples.tpch.client.Common.parseFileSplits;
 
 import java.util.EnumSet;
 
-import org.apache.hyracks.api.client.HyracksConnection;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -50,6 +52,7 @@ import 
org.apache.hyracks.dataflow.std.sort.AbstractSorterOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.sort.Algorithm;
 import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.sort.TopKSorterOperatorDescriptor;
+import org.apache.hyracks.ipc.impl.HyracksConnection;
 import org.kohsuke.args4j.CmdLineParser;
 import org.kohsuke.args4j.Option;
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/da7e8a16/hyracks-fullstack/hyracks/hyracks-hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/pom.xml 
b/hyracks-fullstack/hyracks/hyracks-hdfs/pom.xml
index cba83e3..1a6422b 100644
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/pom.xml
@@ -189,6 +189,11 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-ipc</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.logging.log4j</groupId>
       <artifactId>log4j-api</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/da7e8a16/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/scheduler/Scheduler.java
 
b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/scheduler/Scheduler.java
index 9633fb1..bc187f8 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/scheduler/Scheduler.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/scheduler/Scheduler.java
@@ -33,7 +33,6 @@ import java.util.Random;
 
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hyracks.api.client.HyracksConnection;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.client.NodeControllerInfo;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -41,6 +40,7 @@ import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.topology.ClusterTopology;
 import org.apache.hyracks.hdfs.api.INcCollection;
 import org.apache.hyracks.hdfs.api.INcCollectionBuilder;
+import org.apache.hyracks.ipc.impl.HyracksConnection;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/da7e8a16/hyracks-fullstack/hyracks/hyracks-hdfs/src/test/java/org/apache/hyracks/hdfs/dataflow/DataflowTest.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-hdfs/src/test/java/org/apache/hyracks/hdfs/dataflow/DataflowTest.java
 
b/hyracks-fullstack/hyracks/hyracks-hdfs/src/test/java/org/apache/hyracks/hdfs/dataflow/DataflowTest.java
index 8f96bab..c2ba188 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-hdfs/src/test/java/org/apache/hyracks/hdfs/dataflow/DataflowTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-hdfs/src/test/java/org/apache/hyracks/hdfs/dataflow/DataflowTest.java
@@ -34,7 +34,6 @@ import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.TextInputFormat;
-import org.apache.hyracks.api.client.HyracksConnection;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.constraints.PartitionConstraintHelper;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -54,6 +53,7 @@ import org.apache.hyracks.hdfs.lib.TextKeyValueParserFactory;
 import org.apache.hyracks.hdfs.lib.TextTupleWriterFactory;
 import org.apache.hyracks.hdfs.scheduler.Scheduler;
 import org.apache.hyracks.hdfs.utils.HyracksUtils;
+import org.apache.hyracks.ipc.impl.HyracksConnection;
 import org.apache.hyracks.test.support.TestUtils;
 import org.apache.hyracks.util.file.FileUtil;
 import org.junit.Assert;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/da7e8a16/hyracks-fullstack/hyracks/hyracks-hdfs/src/test/java/org/apache/hyracks/hdfs/utils/HyracksUtils.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-hdfs/src/test/java/org/apache/hyracks/hdfs/utils/HyracksUtils.java
 
b/hyracks-fullstack/hyracks/hyracks-hdfs/src/test/java/org/apache/hyracks/hdfs/utils/HyracksUtils.java
index 1fddc46..17cd793 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-hdfs/src/test/java/org/apache/hyracks/hdfs/utils/HyracksUtils.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-hdfs/src/test/java/org/apache/hyracks/hdfs/utils/HyracksUtils.java
@@ -21,7 +21,7 @@ package org.apache.hyracks.hdfs.utils;
 
 import java.util.EnumSet;
 
-import org.apache.hyracks.api.client.HyracksConnection;
+import org.apache.hyracks.ipc.impl.HyracksConnection;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.JobId;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/da7e8a16/hyracks-fullstack/hyracks/hyracks-hdfs/src/test/java/org/apache/hyracks/hdfs2/dataflow/DataflowTest.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-hdfs/src/test/java/org/apache/hyracks/hdfs2/dataflow/DataflowTest.java
 
b/hyracks-fullstack/hyracks/hyracks-hdfs/src/test/java/org/apache/hyracks/hdfs2/dataflow/DataflowTest.java
index 02c0a20..04fdc85 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-hdfs/src/test/java/org/apache/hyracks/hdfs2/dataflow/DataflowTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-hdfs/src/test/java/org/apache/hyracks/hdfs2/dataflow/DataflowTest.java
@@ -31,7 +31,7 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hyracks.api.client.HyracksConnection;
+import org.apache.hyracks.ipc.impl.HyracksConnection;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.constraints.PartitionConstraintHelper;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/da7e8a16/hyracks-fullstack/hyracks/hyracks-ipc/pom.xml
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/pom.xml 
b/hyracks-fullstack/hyracks/hyracks-ipc/pom.xml
index d014f3b..4db3418 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/pom.xml
@@ -54,5 +54,18 @@
       <artifactId>hyracks-util</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-api</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.httpcomponents</groupId>
+      <artifactId>httpcore</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.httpcomponents</groupId>
+      <artifactId>httpclient</artifactId>
+    </dependency>
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/da7e8a16/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksClientInterfaceFunctions.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksClientInterfaceFunctions.java
 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksClientInterfaceFunctions.java
new file mode 100644
index 0000000..a61c96d
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksClientInterfaceFunctions.java
@@ -0,0 +1,464 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.ipc.impl;
+
+import java.io.Serializable;
+import java.net.URL;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hyracks.api.deployment.DeploymentId;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
+import org.apache.hyracks.api.job.JobFlag;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.result.ResultDirectoryRecord;
+import org.apache.hyracks.api.result.ResultSetId;
+
+public class HyracksClientInterfaceFunctions {
+    public enum FunctionId {
+        GET_CLUSTER_CONTROLLER_INFO,
+        GET_CLUSTER_TOPOLOGY,
+        GET_JOB_STATUS,
+        GET_JOB_INFO,
+        START_JOB,
+        DEPLOY_JOB,
+        UNDEPLOY_JOB,
+        REDEPLOY_JOB,
+        CANCEL_JOB,
+        GET_RESULT_DIRECTORY_ADDRESS,
+        GET_RESULT_STATUS,
+        GET_RESULT_LOCATIONS,
+        WAIT_FOR_COMPLETION,
+        GET_NODE_CONTROLLERS_INFO,
+        CLI_DEPLOY_BINARY,
+        CLI_UNDEPLOY_BINARY,
+        CLUSTER_SHUTDOWN,
+        GET_NODE_DETAILS_JSON,
+        THREAD_DUMP
+    }
+
+    public abstract static class Function implements Serializable {
+        private static final long serialVersionUID = 1L;
+
+        public abstract FunctionId getFunctionId();
+    }
+
+    public static class GetClusterControllerInfoFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.GET_CLUSTER_CONTROLLER_INFO;
+        }
+    }
+
+    public static class GetJobStatusFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final JobId jobId;
+
+        public GetJobStatusFunction(JobId jobId) {
+            this.jobId = jobId;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.GET_JOB_STATUS;
+        }
+
+        public JobId getJobId() {
+            return jobId;
+        }
+    }
+
+    public static class GetJobInfoFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final JobId jobId;
+
+        public GetJobInfoFunction(JobId jobId) {
+            this.jobId = jobId;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.GET_JOB_INFO;
+        }
+
+        public JobId getJobId() {
+            return jobId;
+        }
+    }
+
+    public static class redeployJobSpecFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final byte[] acggfBytes;
+
+        private final DeployedJobSpecId deployedJobSpecId;
+
+        public redeployJobSpecFunction(DeployedJobSpecId deployedJobSpecId, 
byte[] acggfBytes) {
+            this.deployedJobSpecId = deployedJobSpecId;
+            this.acggfBytes = acggfBytes;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.REDEPLOY_JOB;
+        }
+
+        public byte[] getACGGFBytes() {
+            return acggfBytes;
+        }
+
+        public DeployedJobSpecId getDeployedJobSpecId() {
+            return deployedJobSpecId;
+        }
+    }
+
+    public static class DeployJobSpecFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final byte[] acggfBytes;
+
+        public DeployJobSpecFunction(byte[] acggfBytes) {
+            this.acggfBytes = acggfBytes;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.DEPLOY_JOB;
+        }
+
+        public byte[] getACGGFBytes() {
+            return acggfBytes;
+        }
+    }
+
+    public static class CancelJobFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final JobId jobId;
+
+        public CancelJobFunction(JobId jobId) {
+            this.jobId = jobId;
+            if (jobId == null) {
+                throw new IllegalArgumentException("jobId");
+            }
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.CANCEL_JOB;
+        }
+
+        public JobId getJobId() {
+            return jobId;
+        }
+    }
+
+    public static class UndeployJobSpecFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final DeployedJobSpecId deployedJobSpecId;
+
+        public UndeployJobSpecFunction(DeployedJobSpecId deployedJobSpecId) {
+            this.deployedJobSpecId = deployedJobSpecId;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.UNDEPLOY_JOB;
+        }
+
+        public DeployedJobSpecId getDeployedJobSpecId() {
+            return deployedJobSpecId;
+        }
+    }
+
+    public static class StartJobFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final byte[] acggfBytes;
+        private final Set<JobFlag> jobFlags;
+        private final DeploymentId deploymentId;
+        private final DeployedJobSpecId deployedJobSpecId;
+        private final Map<byte[], byte[]> jobParameters;
+
+        public StartJobFunction(DeploymentId deploymentId, byte[] acggfBytes, 
Set<JobFlag> jobFlags,
+                DeployedJobSpecId deployedJobSpecId, Map<byte[], byte[]> 
jobParameters) {
+            this.acggfBytes = acggfBytes;
+            this.jobFlags = jobFlags;
+            this.deploymentId = deploymentId;
+            this.deployedJobSpecId = deployedJobSpecId;
+            this.jobParameters = jobParameters;
+        }
+
+        public StartJobFunction(DeployedJobSpecId deployedJobSpecId, 
Map<byte[], byte[]> jobParameters) {
+            this(null, null, EnumSet.noneOf(JobFlag.class), deployedJobSpecId, 
jobParameters);
+        }
+
+        public StartJobFunction(byte[] acggfBytes, Set<JobFlag> jobFlags) {
+            this(null, acggfBytes, jobFlags, null, null);
+        }
+
+        public StartJobFunction(DeploymentId deploymentId, byte[] acggfBytes, 
Set<JobFlag> jobFlags) {
+            this(deploymentId, acggfBytes, jobFlags, null, null);
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.START_JOB;
+        }
+
+        public Map<byte[], byte[]> getJobParameters() {
+            return jobParameters;
+        }
+
+        public DeployedJobSpecId getDeployedJobSpecId() {
+            return deployedJobSpecId;
+        }
+
+        public byte[] getACGGFBytes() {
+            return acggfBytes;
+        }
+
+        public Set<JobFlag> getJobFlags() {
+            return jobFlags;
+        }
+
+        public DeploymentId getDeploymentId() {
+            return deploymentId;
+        }
+    }
+
+    public static class GetResultDirectoryAddressFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.GET_RESULT_DIRECTORY_ADDRESS;
+        }
+    }
+
+    public static class GetResultStatusFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final JobId jobId;
+
+        private final ResultSetId rsId;
+
+        public GetResultStatusFunction(JobId jobId, ResultSetId rsId) {
+            this.jobId = jobId;
+            this.rsId = rsId;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.GET_RESULT_STATUS;
+        }
+
+        public JobId getJobId() {
+            return jobId;
+        }
+
+        public ResultSetId getResultSetId() {
+            return rsId;
+        }
+    }
+
+    public static class GetResultLocationsFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final JobId jobId;
+
+        private final ResultSetId rsId;
+
+        private final ResultDirectoryRecord[] knownRecords;
+
+        public GetResultLocationsFunction(JobId jobId, ResultSetId rsId, 
ResultDirectoryRecord[] knownRecords) {
+            this.jobId = jobId;
+            this.rsId = rsId;
+            this.knownRecords = knownRecords;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.GET_RESULT_LOCATIONS;
+        }
+
+        public JobId getJobId() {
+            return jobId;
+        }
+
+        public ResultSetId getResultSetId() {
+            return rsId;
+        }
+
+        public ResultDirectoryRecord[] getKnownRecords() {
+            return knownRecords;
+        }
+    }
+
+    public static class WaitForCompletionFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final JobId jobId;
+
+        public WaitForCompletionFunction(JobId jobId) {
+            this.jobId = jobId;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.WAIT_FOR_COMPLETION;
+        }
+
+        public JobId getJobId() {
+            return jobId;
+        }
+    }
+
+    public static class GetNodeControllersInfoFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.GET_NODE_CONTROLLERS_INFO;
+        }
+    }
+
+    public static class GetClusterTopologyFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.GET_CLUSTER_TOPOLOGY;
+        }
+    }
+
+    public static class CliDeployBinaryFunction extends Function {
+        private static final long serialVersionUID = 1L;
+        private final List<URL> binaryURLs;
+        private final DeploymentId deploymentId;
+
+        public CliDeployBinaryFunction(List<URL> binaryURLs, DeploymentId 
deploymentId) {
+            this.binaryURLs = binaryURLs;
+            this.deploymentId = deploymentId;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.CLI_DEPLOY_BINARY;
+        }
+
+        public List<URL> getBinaryURLs() {
+            return binaryURLs;
+        }
+
+        public DeploymentId getDeploymentId() {
+            return deploymentId;
+        }
+    }
+
+    public static class CliUnDeployBinaryFunction extends Function {
+        private static final long serialVersionUID = 1L;
+        private final DeploymentId deploymentId;
+
+        public CliUnDeployBinaryFunction(DeploymentId deploymentId) {
+            this.deploymentId = deploymentId;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.CLI_UNDEPLOY_BINARY;
+        }
+
+        public DeploymentId getDeploymentId() {
+            return deploymentId;
+        }
+    }
+
+    public static class ClusterShutdownFunction extends Function {
+        private static final long serialVersionUID = 1L;
+        private final boolean terminateNCService;
+
+        public ClusterShutdownFunction(boolean terminateNCService) {
+            this.terminateNCService = terminateNCService;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.CLUSTER_SHUTDOWN;
+        }
+
+        public boolean isTerminateNCService() {
+            return terminateNCService;
+        }
+    }
+
+    public static class GetNodeDetailsJSONFunction extends Function {
+        private static final long serialVersionUID = 1L;
+        private final String nodeId;
+        private final boolean includeStats;
+        private final boolean includeConfig;
+
+        public GetNodeDetailsJSONFunction(String nodeId, boolean includeStats, 
boolean includeConfig) {
+            this.nodeId = nodeId;
+            this.includeStats = includeStats;
+            this.includeConfig = includeConfig;
+        }
+
+        public String getNodeId() {
+            return nodeId;
+        }
+
+        public boolean isIncludeStats() {
+            return includeStats;
+        }
+
+        public boolean isIncludeConfig() {
+            return includeConfig;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.GET_NODE_DETAILS_JSON;
+        }
+    }
+
+    public static class ThreadDumpFunction extends Function {
+        private final String node;
+
+        public ThreadDumpFunction(String node) {
+            this.node = node;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.THREAD_DUMP;
+        }
+
+        public String getNode() {
+            return node;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/da7e8a16/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksClientInterfaceRemoteProxy.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksClientInterfaceRemoteProxy.java
 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksClientInterfaceRemoteProxy.java
new file mode 100644
index 0000000..3fac7da
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksClientInterfaceRemoteProxy.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.ipc.impl;
+
+import java.net.URL;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hyracks.api.client.IHyracksClientInterface;
+import org.apache.hyracks.api.client.NodeControllerInfo;
+import org.apache.hyracks.api.client.impl.ClusterControllerInfo;
+import org.apache.hyracks.api.comm.NetworkAddress;
+import org.apache.hyracks.api.deployment.DeploymentId;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
+import org.apache.hyracks.api.job.JobFlag;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobInfo;
+import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.topology.ClusterTopology;
+import org.apache.hyracks.ipc.api.IIPCHandle;
+import org.apache.hyracks.ipc.api.RPCInterface;
+import org.apache.hyracks.ipc.exceptions.IPCException;
+
+public class HyracksClientInterfaceRemoteProxy implements 
IHyracksClientInterface {
+    private static final int SHUTDOWN_CONNECTION_TIMEOUT_SECS = 30;
+
+    private final IIPCHandle ipcHandle;
+
+    private final RPCInterface rpci;
+
+    public HyracksClientInterfaceRemoteProxy(IIPCHandle ipcHandle, 
RPCInterface rpci) {
+        this.ipcHandle = ipcHandle;
+        this.rpci = rpci;
+    }
+
+    @Override
+    public ClusterControllerInfo getClusterControllerInfo() throws Exception {
+        HyracksClientInterfaceFunctions.GetClusterControllerInfoFunction gccif 
=
+                new 
HyracksClientInterfaceFunctions.GetClusterControllerInfoFunction();
+        return (ClusterControllerInfo) rpci.call(ipcHandle, gccif);
+    }
+
+    @Override
+    public JobStatus getJobStatus(JobId jobId) throws Exception {
+        HyracksClientInterfaceFunctions.GetJobStatusFunction gjsf =
+                new 
HyracksClientInterfaceFunctions.GetJobStatusFunction(jobId);
+        return (JobStatus) rpci.call(ipcHandle, gjsf);
+    }
+
+    @Override
+    public JobId startJob(byte[] acggfBytes, EnumSet<JobFlag> jobFlags) throws 
Exception {
+        HyracksClientInterfaceFunctions.StartJobFunction sjf =
+                new 
HyracksClientInterfaceFunctions.StartJobFunction(acggfBytes, jobFlags);
+        return (JobId) rpci.call(ipcHandle, sjf);
+    }
+
+    @Override
+    public void cancelJob(JobId jobId) throws Exception {
+        HyracksClientInterfaceFunctions.CancelJobFunction cjf =
+                new HyracksClientInterfaceFunctions.CancelJobFunction(jobId);
+        rpci.call(ipcHandle, cjf);
+    }
+
+    @Override
+    public JobId startJob(DeployedJobSpecId deployedJobSpecId, Map<byte[], 
byte[]> jobParameters) throws Exception {
+        HyracksClientInterfaceFunctions.StartJobFunction sjf =
+                new 
HyracksClientInterfaceFunctions.StartJobFunction(deployedJobSpecId, 
jobParameters);
+        return (JobId) rpci.call(ipcHandle, sjf);
+    }
+
+    @Override
+    public JobId startJob(DeploymentId deploymentId, byte[] acggfBytes, 
EnumSet<JobFlag> jobFlags) throws Exception {
+        HyracksClientInterfaceFunctions.StartJobFunction sjf =
+                new 
HyracksClientInterfaceFunctions.StartJobFunction(deploymentId, acggfBytes, 
jobFlags);
+        return (JobId) rpci.call(ipcHandle, sjf);
+    }
+
+    @Override
+    public DeployedJobSpecId deployJobSpec(byte[] acggfBytes) throws Exception 
{
+        HyracksClientInterfaceFunctions.DeployJobSpecFunction sjf =
+                new 
HyracksClientInterfaceFunctions.DeployJobSpecFunction(acggfBytes);
+        return (DeployedJobSpecId) rpci.call(ipcHandle, sjf);
+    }
+
+    @Override
+    public void redeployJobSpec(DeployedJobSpecId deployedJobSpecId, byte[] 
acggfBytes) throws Exception {
+        HyracksClientInterfaceFunctions.redeployJobSpecFunction udjsf =
+                new 
HyracksClientInterfaceFunctions.redeployJobSpecFunction(deployedJobSpecId, 
acggfBytes);
+        rpci.call(ipcHandle, udjsf);
+    }
+
+    @Override
+    public void undeployJobSpec(DeployedJobSpecId deployedJobSpecId) throws 
Exception {
+        HyracksClientInterfaceFunctions.UndeployJobSpecFunction sjf =
+                new 
HyracksClientInterfaceFunctions.UndeployJobSpecFunction(deployedJobSpecId);
+        rpci.call(ipcHandle, sjf);
+    }
+
+    @Override
+    public NetworkAddress getResultDirectoryAddress() throws Exception {
+        HyracksClientInterfaceFunctions.GetResultDirectoryAddressFunction 
gddsf =
+                new 
HyracksClientInterfaceFunctions.GetResultDirectoryAddressFunction();
+        return (NetworkAddress) rpci.call(ipcHandle, gddsf);
+    }
+
+    @Override
+    public void waitForCompletion(JobId jobId) throws Exception {
+        HyracksClientInterfaceFunctions.WaitForCompletionFunction wfcf =
+                new 
HyracksClientInterfaceFunctions.WaitForCompletionFunction(jobId);
+        rpci.call(ipcHandle, wfcf);
+    }
+
+    @Override
+    public Map<String, NodeControllerInfo> getNodeControllersInfo() throws 
Exception {
+        HyracksClientInterfaceFunctions.GetNodeControllersInfoFunction gncif =
+                new 
HyracksClientInterfaceFunctions.GetNodeControllersInfoFunction();
+        return (Map<String, NodeControllerInfo>) rpci.call(ipcHandle, gncif);
+    }
+
+    @Override
+    public ClusterTopology getClusterTopology() throws Exception {
+        HyracksClientInterfaceFunctions.GetClusterTopologyFunction gctf =
+                new 
HyracksClientInterfaceFunctions.GetClusterTopologyFunction();
+        return (ClusterTopology) rpci.call(ipcHandle, gctf);
+    }
+
+    @Override
+    public void deployBinary(List<URL> binaryURLs, DeploymentId deploymentId) 
throws Exception {
+        HyracksClientInterfaceFunctions.CliDeployBinaryFunction dbf =
+                new 
HyracksClientInterfaceFunctions.CliDeployBinaryFunction(binaryURLs, 
deploymentId);
+        rpci.call(ipcHandle, dbf);
+    }
+
+    @Override
+    public void unDeployBinary(DeploymentId deploymentId) throws Exception {
+        HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction dbf =
+                new 
HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction(deploymentId);
+        rpci.call(ipcHandle, dbf);
+    }
+
+    @Override
+    public JobInfo getJobInfo(JobId jobId) throws Exception {
+        HyracksClientInterfaceFunctions.GetJobInfoFunction gjsf =
+                new HyracksClientInterfaceFunctions.GetJobInfoFunction(jobId);
+        return (JobInfo) rpci.call(ipcHandle, gjsf);
+    }
+
+    @Override
+    public void stopCluster(boolean terminateNCService) throws Exception {
+        HyracksClientInterfaceFunctions.ClusterShutdownFunction csdf =
+                new 
HyracksClientInterfaceFunctions.ClusterShutdownFunction(terminateNCService);
+        rpci.call(ipcHandle, csdf);
+        int i = 0;
+        // give the CC some time to do final settling after it returns our 
request
+        while (ipcHandle.isConnected() && i++ < 
SHUTDOWN_CONNECTION_TIMEOUT_SECS) {
+            synchronized (this) {
+                wait(TimeUnit.SECONDS.toMillis(1));
+            }
+        }
+        if (ipcHandle.isConnected()) {
+            throw new IPCException(
+                    "CC refused to release connection after " + 
SHUTDOWN_CONNECTION_TIMEOUT_SECS + " seconds");
+        }
+    }
+
+    @Override
+    public String getNodeDetailsJSON(String nodeId, boolean includeStats, 
boolean includeConfig) throws Exception {
+        HyracksClientInterfaceFunctions.GetNodeDetailsJSONFunction gjsf =
+                new 
HyracksClientInterfaceFunctions.GetNodeDetailsJSONFunction(nodeId, 
includeStats, includeConfig);
+        return (String) rpci.call(ipcHandle, gjsf);
+    }
+
+    @Override
+    public String getThreadDump(String node) throws Exception {
+        HyracksClientInterfaceFunctions.ThreadDumpFunction tdf =
+                new HyracksClientInterfaceFunctions.ThreadDumpFunction(node);
+        return (String) rpci.call(ipcHandle, tdf);
+    }
+
+    @Override
+    public boolean isConnected() {
+        return ipcHandle.isConnected();
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/da7e8a16/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksConnection.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksConnection.java
 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksConnection.java
new file mode 100644
index 0000000..e6c28fa
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksConnection.java
@@ -0,0 +1,482 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.ipc.impl;
+
+import java.io.File;
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.entity.FileEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.client.IHyracksClientInterface;
+import org.apache.hyracks.api.client.NodeControllerInfo;
+import org.apache.hyracks.api.client.impl.ClusterControllerInfo;
+import 
org.apache.hyracks.api.client.impl.JobSpecificationActivityClusterGraphGeneratorFactory;
+import org.apache.hyracks.api.comm.NetworkAddress;
+import org.apache.hyracks.api.deployment.DeploymentId;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
+import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
+import org.apache.hyracks.api.job.JobFlag;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobInfo;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.topology.ClusterTopology;
+import org.apache.hyracks.api.util.InvokeUtil;
+import org.apache.hyracks.api.util.JavaSerializationUtils;
+import org.apache.hyracks.ipc.api.RPCInterface;
+import org.apache.hyracks.util.ExitUtil;
+import org.apache.hyracks.util.InterruptibleAction;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+/**
+ * Connection Class used by a Hyracks Client to interact with a Hyracks Cluster
+ * Controller.
+ *
+ * @author vinayakb
+ */
+public final class HyracksConnection implements IHyracksClientConnection {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+
+    private final String ccHost;
+
+    private final int ccPort;
+
+    private final IPCSystem ipc;
+
+    private final IHyracksClientInterface hci;
+
+    private final ClusterControllerInfo ccInfo;
+
+    private volatile boolean running = false;
+
+    private volatile long reqId = 0L;
+
+    private final ExecutorService uninterruptibleExecutor =
+            Executors.newFixedThreadPool(2, r -> new Thread(r, 
"HyracksConnection Uninterrubtible thread: "));
+
+    private final BlockingQueue<UnInterruptibleRequest<?>> uninterruptibles = 
new ArrayBlockingQueue<>(1);
+
+    /**
+     * Constructor to create a connection to the Hyracks Cluster Controller.
+     *
+     * @param ccHost
+     *            Host name (or IP Address) where the Cluster Controller can be
+     *            reached.
+     * @param ccPort
+     *            Port to reach the Hyracks Cluster Controller at the specified
+     *            host name.
+     * @throws Exception
+     */
+    public HyracksConnection(String ccHost, int ccPort) throws Exception {
+        this.ccHost = ccHost;
+        this.ccPort = ccPort;
+        RPCInterface rpci = new RPCInterface();
+        ipc = new IPCSystem(new InetSocketAddress(0), rpci, new 
JavaSerializationBasedPayloadSerializerDeserializer());
+        ipc.start();
+        hci = new 
HyracksClientInterfaceRemoteProxy(ipc.getReconnectingHandle(new 
InetSocketAddress(ccHost, ccPort)),
+                rpci);
+        ccInfo = hci.getClusterControllerInfo();
+        uninterruptibleExecutor.execute(new UninterrubtileRequestHandler());
+        uninterruptibleExecutor.execute(new UninterrubtileHandlerWatcher());
+    }
+
+    @Override
+    public JobStatus getJobStatus(JobId jobId) throws Exception {
+        return hci.getJobStatus(jobId);
+    }
+
+    @Override
+    public void cancelJob(JobId jobId) throws Exception {
+        CancelJobRequest request = new CancelJobRequest(jobId);
+        uninterruptiblySubmitAndExecute(request);
+    }
+
+    @Override
+    public JobId startJob(JobSpecification jobSpec) throws Exception {
+        return startJob(jobSpec, EnumSet.noneOf(JobFlag.class));
+    }
+
+    @Override
+    public JobId startJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) 
throws Exception {
+        IActivityClusterGraphGeneratorFactory jsacggf =
+                new 
JobSpecificationActivityClusterGraphGeneratorFactory(jobSpec);
+        return startJob(jsacggf, jobFlags);
+    }
+
+    @Override
+    public void redeployJobSpec(DeployedJobSpecId deployedJobSpecId, 
JobSpecification jobSpec) throws Exception {
+        JobSpecificationActivityClusterGraphGeneratorFactory jsacggf =
+                new 
JobSpecificationActivityClusterGraphGeneratorFactory(jobSpec);
+        hci.redeployJobSpec(deployedJobSpecId, 
JavaSerializationUtils.serialize(jsacggf));
+    }
+
+    @Override
+    public DeployedJobSpecId deployJobSpec(JobSpecification jobSpec) throws 
Exception {
+        JobSpecificationActivityClusterGraphGeneratorFactory jsacggf =
+                new 
JobSpecificationActivityClusterGraphGeneratorFactory(jobSpec);
+        return deployJobSpec(jsacggf);
+    }
+
+    @Override
+    public void undeployJobSpec(DeployedJobSpecId deployedJobSpecId) throws 
Exception {
+        hci.undeployJobSpec(deployedJobSpecId);
+    }
+
+    @Override
+    public JobId startJob(DeployedJobSpecId deployedJobSpecId, Map<byte[], 
byte[]> jobParameters) throws Exception {
+        StartDeployedJobRequest request = new 
StartDeployedJobRequest(deployedJobSpecId, jobParameters);
+        return interruptiblySubmitAndExecute(request);
+    }
+
+    @Override
+    public JobId startJob(IActivityClusterGraphGeneratorFactory acggf, 
EnumSet<JobFlag> jobFlags) throws Exception {
+        return startJob(null, acggf, jobFlags);
+    }
+
+    public DeployedJobSpecId 
deployJobSpec(IActivityClusterGraphGeneratorFactory acggf) throws Exception {
+        return hci.deployJobSpec(JavaSerializationUtils.serialize(acggf));
+    }
+
+    @Override
+    public NetworkAddress getResultDirectoryAddress() throws Exception {
+        return hci.getResultDirectoryAddress();
+    }
+
+    @Override
+    public void waitForCompletion(JobId jobId) throws Exception {
+        try {
+            hci.waitForCompletion(jobId);
+        } catch (InterruptedException e) {
+            // Cancels an on-going job if the current thread gets interrupted.
+            cancelJob(jobId);
+            throw e;
+        }
+    }
+
+    @Override
+    public Map<String, NodeControllerInfo> getNodeControllerInfos() throws 
HyracksException {
+        try {
+            return hci.getNodeControllersInfo();
+        } catch (Exception e) {
+            throw HyracksException.create(e);
+        }
+    }
+
+    @Override
+    public ClusterTopology getClusterTopology() throws HyracksException {
+        try {
+            return hci.getClusterTopology();
+        } catch (Exception e) {
+            throw HyracksException.create(e);
+        }
+    }
+
+    @Override
+    public DeploymentId deployBinary(List<String> jars) throws Exception {
+        /** generate a deployment id */
+        DeploymentId deploymentId = new 
DeploymentId(UUID.randomUUID().toString());
+        List<URL> binaryURLs = new ArrayList<>();
+        if (jars != null && !jars.isEmpty()) {
+            CloseableHttpClient hc = new DefaultHttpClient();
+            try {
+                /** upload jars through a http client one-by-one to the CC 
server */
+                for (String jar : jars) {
+                    int slashIndex = jar.lastIndexOf('/');
+                    String fileName = jar.substring(slashIndex + 1);
+                    String url = "http://"; + ccHost + ":" + 
ccInfo.getWebPort() + "/applications/"
+                            + deploymentId.toString() + "&" + fileName;
+                    HttpPut put = new HttpPut(url);
+                    put.setEntity(new FileEntity(new File(jar), 
"application/octet-stream"));
+                    HttpResponse response = hc.execute(put);
+                    response.getEntity().consumeContent();
+                    if (response.getStatusLine().getStatusCode() != 200) {
+                        hci.unDeployBinary(deploymentId);
+                        throw new 
HyracksException(response.getStatusLine().toString());
+                    }
+                    /** add the uploaded URL address into the URLs of jars to 
be deployed at NCs */
+                    binaryURLs.add(new URL(url));
+                }
+            } finally {
+                hc.close();
+            }
+        }
+        /** deploy the URLs to the CC and NCs */
+        hci.deployBinary(binaryURLs, deploymentId);
+        return deploymentId;
+    }
+
+    @Override
+    public void unDeployBinary(DeploymentId deploymentId) throws Exception {
+        hci.unDeployBinary(deploymentId);
+    }
+
+    @Override
+    public JobId startJob(DeploymentId deploymentId, JobSpecification jobSpec) 
throws Exception {
+        return startJob(deploymentId, jobSpec, EnumSet.noneOf(JobFlag.class));
+    }
+
+    @Override
+    public JobId startJob(DeploymentId deploymentId, JobSpecification jobSpec, 
EnumSet<JobFlag> jobFlags)
+            throws Exception {
+        IActivityClusterGraphGeneratorFactory jsacggf =
+                new 
JobSpecificationActivityClusterGraphGeneratorFactory(jobSpec);
+        return startJob(deploymentId, jsacggf, jobFlags);
+    }
+
+    @Override
+    public JobId startJob(DeploymentId deploymentId, 
IActivityClusterGraphGeneratorFactory acggf,
+            EnumSet<JobFlag> jobFlags) throws Exception {
+        StartJobRequest request = new StartJobRequest(deploymentId, acggf, 
jobFlags);
+        return interruptiblySubmitAndExecute(request);
+    }
+
+    @Override
+    public JobInfo getJobInfo(JobId jobId) throws Exception {
+        return hci.getJobInfo(jobId);
+    }
+
+    @Override
+    public void stopCluster(boolean terminateNCService) throws Exception {
+        hci.stopCluster(terminateNCService);
+    }
+
+    @Override
+    public String getNodeDetailsJSON(String nodeId, boolean includeStats, 
boolean includeConfig) throws Exception {
+        return hci.getNodeDetailsJSON(nodeId, includeStats, includeConfig);
+    }
+
+    @Override
+    public String getThreadDump(String node) throws Exception {
+        return hci.getThreadDump(node);
+    }
+
+    @Override
+    public String getHost() {
+        return ccHost;
+    }
+
+    @Override
+    public int getPort() {
+        return ccPort;
+    }
+
+    @Override
+    public boolean isConnected() {
+        return hci.isConnected();
+    }
+
+    private <T> T uninterruptiblySubmitAndExecute(UnInterruptibleRequest<T> 
request) throws Exception {
+        InvokeUtil.doUninterruptibly(() -> uninterruptibles.put(request));
+        return uninterruptiblyExecute(request);
+    }
+
+    private <T> T uninterruptiblyExecute(UnInterruptibleRequest<T> request) 
throws Exception {
+        InvokeUtil.doUninterruptibly(request);
+        return request.result();
+    }
+
+    private <T> T interruptiblySubmitAndExecute(UnInterruptibleRequest<T> 
request) throws Exception {
+        uninterruptibles.put(request);
+        return uninterruptiblyExecute(request);
+    }
+
+    private abstract class UnInterruptibleRequest<T> implements 
InterruptibleAction {
+        boolean completed = false;
+        boolean failed = false;
+        Throwable failure = null;
+        T response = null;
+
+        @SuppressWarnings("squid:S1181")
+        private final void handle() {
+            try {
+                response = doHandle();
+            } catch (Throwable th) {
+                failed = true;
+                failure = th;
+            } finally {
+                synchronized (this) {
+                    completed = true;
+                    notifyAll();
+                }
+            }
+        }
+
+        protected abstract T doHandle() throws Exception;
+
+        @Override
+        public final synchronized void run() throws InterruptedException {
+            while (!completed) {
+                wait();
+            }
+        }
+
+        public T result() throws Exception {
+            if (failed) {
+                if (failure instanceof Error) {
+                    throw (Error) failure;
+                }
+                throw (Exception) failure;
+            }
+            return response;
+        }
+    }
+
+    private class CancelJobRequest extends UnInterruptibleRequest<Void> {
+        final JobId jobId;
+
+        public CancelJobRequest(JobId jobId) {
+            this.jobId = jobId;
+        }
+
+        @Override
+        protected Void doHandle() throws Exception {
+            hci.cancelJob(jobId);
+            return null;
+        }
+
+        @Override
+        public String toString() {
+            return "CancelJobRequest: " + jobId.toString();
+        }
+
+    }
+
+    private class StartDeployedJobRequest extends 
UnInterruptibleRequest<JobId> {
+
+        private final DeployedJobSpecId deployedJobSpecId;
+        private final Map<byte[], byte[]> jobParameters;
+
+        public StartDeployedJobRequest(DeployedJobSpecId deployedJobSpecId, 
Map<byte[], byte[]> jobParameters) {
+            this.deployedJobSpecId = deployedJobSpecId;
+            this.jobParameters = jobParameters;
+        }
+
+        @Override
+        protected JobId doHandle() throws Exception {
+            return hci.startJob(deployedJobSpecId, jobParameters);
+        }
+
+    }
+
+    private class StartJobRequest extends UnInterruptibleRequest<JobId> {
+        private final DeploymentId deploymentId;
+        private final IActivityClusterGraphGeneratorFactory acggf;
+        private final EnumSet<JobFlag> jobFlags;
+
+        public StartJobRequest(DeploymentId deploymentId, 
IActivityClusterGraphGeneratorFactory acggf,
+                EnumSet<JobFlag> jobFlags) {
+            this.deploymentId = deploymentId;
+            this.acggf = acggf;
+            this.jobFlags = jobFlags;
+        }
+
+        @Override
+        protected JobId doHandle() throws Exception {
+            if (deploymentId == null) {
+                return hci.startJob(JavaSerializationUtils.serialize(acggf), 
jobFlags);
+            } else {
+                return hci.startJob(deploymentId, 
JavaSerializationUtils.serialize(acggf), jobFlags);
+            }
+        }
+
+        @Override
+        public String toString() {
+            return "StartJobRequest";
+        }
+
+    }
+
+    private class UninterrubtileRequestHandler implements Runnable {
+        @SuppressWarnings({ "squid:S2189", "squid:S2142" })
+        @Override
+        public void run() {
+            String nameBefore = Thread.currentThread().getName();
+            Thread.currentThread().setName(nameBefore + 
getClass().getSimpleName());
+            try {
+                while (true) {
+                    try {
+                        UnInterruptibleRequest<?> current = 
uninterruptibles.take();
+                        reqId++;
+                        running = true;
+                        current.handle();
+                    } catch (InterruptedException e) {
+                        LOGGER.log(Level.WARN, "Ignoring interrupt. This 
thread should never be interrupted.");
+                        continue;
+                    } finally {
+                        running = false;
+                    }
+                }
+            } finally {
+                Thread.currentThread().setName(nameBefore);
+            }
+        }
+    }
+
+    public class UninterrubtileHandlerWatcher implements Runnable {
+        @Override
+        @SuppressWarnings({ "squid:S2189", "squid:S2142" })
+        public void run() {
+            String nameBefore = Thread.currentThread().getName();
+            Thread.currentThread().setName(nameBefore + 
getClass().getSimpleName());
+            try {
+                long currentReqId = 0L;
+                long currentTime = System.nanoTime();
+                while (true) {
+                    try {
+                        TimeUnit.MINUTES.sleep(1);
+                    } catch (InterruptedException e) {
+                        LOGGER.log(Level.WARN, "Ignoring interrupt. This 
thread should never be interrupted.");
+                        continue;
+                    }
+                    if (running) {
+                        if (reqId == currentReqId) {
+                            if 
(TimeUnit.NANOSECONDS.toMinutes(System.nanoTime() - currentTime) > 0) {
+                                
ExitUtil.halt(ExitUtil.EC_FAILED_TO_PROCESS_UN_INTERRUPTIBLE_REQUEST);
+                            }
+                        } else {
+                            currentReqId = reqId;
+                            currentTime = System.nanoTime();
+                        }
+                    }
+                }
+            } finally {
+                Thread.currentThread().setName(nameBefore);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/da7e8a16/hyracks-fullstack/hyracks/pom.xml
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/pom.xml 
b/hyracks-fullstack/hyracks/pom.xml
index 3b4178d..5990526 100644
--- a/hyracks-fullstack/hyracks/pom.xml
+++ b/hyracks-fullstack/hyracks/pom.xml
@@ -72,8 +72,8 @@
 
   <modules>
     <module>hyracks-util</module>
-    <module>hyracks-ipc</module>
     <module>hyracks-api</module>
+    <module>hyracks-ipc</module>
     <module>hyracks-comm</module>
     <module>hyracks-client</module>
     <module>hyracks-dataflow-common</module>

Reply via email to