Repository: asterixdb
Updated Branches:
  refs/heads/master 92c0bac18 -> 34ee3335a


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34ee3335/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
index 4ee34ca..4eb1732 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
@@ -100,6 +100,10 @@ public class CCNCFunctions {
         SHUTDOWN_REQUEST,
         SHUTDOWN_RESPONSE,
 
+        DISTRIBUTE_JOB,
+        DESTROY_JOB,
+        DISTRIBUTED_JOB_FAILURE,
+
         STATE_DUMP_REQUEST,
         STATE_DUMP_RESPONSE,
 
@@ -282,6 +286,31 @@ public class CCNCFunctions {
         }
     }
 
+    public static class ReportDistributedJobFailureFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final JobId jobId;
+        private final String nodeId;
+
+        public ReportDistributedJobFailureFunction(JobId jobId, String nodeId) 
{
+            this.jobId = jobId;
+            this.nodeId = nodeId;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.DISTRIBUTED_JOB_FAILURE;
+        }
+
+        public JobId getJobId() {
+            return jobId;
+        }
+
+        public String getNodeId() {
+            return nodeId;
+        }
+    }
+
     public static class NotifyJobletCleanupFunction extends Function {
         private static final long serialVersionUID = 1L;
 
@@ -670,6 +699,51 @@ public class CCNCFunctions {
         }
     }
 
+    public static class DistributeJobFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final JobId jobId;
+
+        private final byte[] acgBytes;
+
+        public DistributeJobFunction(JobId jobId, byte[] acgBytes) {
+            this.jobId = jobId;
+            this.acgBytes = acgBytes;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.DISTRIBUTE_JOB;
+        }
+
+        public JobId getJobId() {
+            return jobId;
+        }
+
+        public byte[] getacgBytes() {
+            return acgBytes;
+        }
+    }
+
+    public static class DestroyJobFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final JobId jobId;
+
+        public DestroyJobFunction(JobId jobId) {
+            this.jobId = jobId;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.DESTROY_JOB;
+        }
+
+        public JobId getJobId() {
+            return jobId;
+        }
+    }
+
     public static class StartTasksFunction extends Function {
         private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34ee3335/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index ac6fc2c..83ef32b 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -141,6 +141,13 @@ public class ClusterControllerRemoteProxy implements 
IClusterController {
     }
 
     @Override
+    public void notifyDistributedJobFailure(JobId jobId, String nodeId) throws 
Exception {
+        CCNCFunctions.ReportDistributedJobFailureFunction fn =
+                new CCNCFunctions.ReportDistributedJobFailureFunction(jobId, 
nodeId);
+        ipcHandle.send(-1, fn, null);
+    }
+
+    @Override
     public void getNodeControllerInfos() throws Exception {
         ipcHandle.send(-1, new CCNCFunctions.GetNodeControllersInfoFunction(), 
null);
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34ee3335/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index 0d59b8d..2a8464e 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -84,6 +84,18 @@ public class NodeControllerRemoteProxy implements 
INodeController {
     }
 
     @Override
+    public void distributeJob(JobId jobId, byte[] planBytes) throws Exception {
+        CCNCFunctions.DistributeJobFunction fn = new 
CCNCFunctions.DistributeJobFunction(jobId, planBytes);
+        ipcHandle.send(-1, fn, null);
+    }
+
+    @Override
+    public void destroyJob(JobId jobId) throws Exception {
+        CCNCFunctions.DestroyJobFunction fn = new 
CCNCFunctions.DestroyJobFunction(jobId);
+        ipcHandle.send(-1, fn, null);
+    }
+
+    @Override
     public void dumpState(String stateDumpId) throws Exception {
         CCNCFunctions.StateDumpRequestFunction dsf = new 
CCNCFunctions.StateDumpRequestFunction(stateDumpId);
         ipcHandle.send(-1, dsf, null);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34ee3335/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
index 93ccaa4..e6278be 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
@@ -26,6 +26,8 @@ import org.apache.hyracks.control.nc.work.AbortTasksWork;
 import org.apache.hyracks.control.nc.work.ApplicationMessageWork;
 import org.apache.hyracks.control.nc.work.CleanupJobletWork;
 import org.apache.hyracks.control.nc.work.DeployBinaryWork;
+import org.apache.hyracks.control.nc.work.DestroyJobWork;
+import org.apache.hyracks.control.nc.work.DistributeJobWork;
 import org.apache.hyracks.control.nc.work.ReportPartitionAvailabilityWork;
 import org.apache.hyracks.control.nc.work.StartTasksWork;
 import org.apache.hyracks.control.nc.work.StateDumpWork;
@@ -99,6 +101,16 @@ final class NodeControllerIPCI implements IIPCI {
                 ncs.getWorkQueue().schedule(new UnDeployBinaryWork(ncs, 
ndbf.getDeploymentId()));
                 return;
 
+            case DISTRIBUTE_JOB:
+                CCNCFunctions.DistributeJobFunction djf = 
(CCNCFunctions.DistributeJobFunction) fn;
+                ncs.getWorkQueue().schedule(new DistributeJobWork(ncs, 
djf.getJobId(), djf.getacgBytes()));
+                return;
+
+            case DESTROY_JOB:
+                CCNCFunctions.DestroyJobFunction dsjf = 
(CCNCFunctions.DestroyJobFunction) fn;
+                ncs.getWorkQueue().schedule(new DestroyJobWork(ncs, 
dsjf.getJobId()));
+                return;
+
             case STATE_DUMP_REQUEST:
                 final CCNCFunctions.StateDumpRequestFunction dsrf = 
(StateDumpRequestFunction) fn;
                 ncs.getWorkQueue().schedule(new StateDumpWork(ncs, 
dsrf.getStateDumpId()));

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34ee3335/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 19f01c1..bf0ddb6 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -46,6 +46,8 @@ import org.apache.hyracks.api.client.NodeControllerInfo;
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.dataset.IDatasetPartitionManager;
 import org.apache.hyracks.api.deployment.DeploymentId;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.io.IODeviceHandle;
 import org.apache.hyracks.api.job.ActivityClusterGraph;
 import org.apache.hyracks.api.job.JobId;
@@ -94,11 +96,11 @@ public class NodeControllerService implements 
IControllerService {
 
     private final IOManager ioManager;
 
-    private final IPCSystem ipc;
+    private IPCSystem ipc;
 
-    private final PartitionManager partitionManager;
+    private PartitionManager partitionManager;
 
-    private final NetworkManager netManager;
+    private NetworkManager netManager;
 
     private IDatasetPartitionManager datasetPartitionManager;
 
@@ -155,18 +157,11 @@ public class NodeControllerService implements 
IControllerService {
     public NodeControllerService(NCConfig ncConfig) throws Exception {
         this.ncConfig = ncConfig;
         id = ncConfig.nodeId;
-        ipc = new IPCSystem(new 
InetSocketAddress(ncConfig.clusterNetIPAddress, ncConfig.clusterNetPort),
-                new NodeControllerIPCI(this),
-                new CCNCFunctions.SerializerDeserializer());
 
         ioManager = new 
IOManager(IODeviceHandle.getDevices(ncConfig.ioDevices));
         if (id == null) {
             throw new Exception("id not set");
         }
-        partitionManager = new PartitionManager(this);
-        netManager = new NetworkManager(ncConfig.dataIPAddress, 
ncConfig.dataPort, partitionManager,
-                ncConfig.nNetThreads, ncConfig.nNetBuffers, 
ncConfig.dataPublicIPAddress, ncConfig.dataPublicPort,
-                FullFrameChannelInterfaceFactory.INSTANCE);
 
         lccm = new LifeCycleComponentManager();
         workQueue = new WorkQueue(id, Thread.NORM_PRIORITY); // Reserves 
MAX_PRIORITY of the heartbeat thread.
@@ -244,7 +239,13 @@ public class NodeControllerService implements 
IControllerService {
     @Override
     public void start() throws Exception {
         LOGGER.log(Level.INFO, "Starting NodeControllerService");
+        ipc = new IPCSystem(new 
InetSocketAddress(ncConfig.clusterNetIPAddress, ncConfig.clusterNetPort),
+                new NodeControllerIPCI(this), new 
CCNCFunctions.SerializerDeserializer());
         ipc.start();
+        partitionManager = new PartitionManager(this);
+        netManager = new NetworkManager(ncConfig.dataIPAddress, 
ncConfig.dataPort, partitionManager,
+                ncConfig.nNetThreads, ncConfig.nNetBuffers, 
ncConfig.dataPublicIPAddress, ncConfig.dataPublicPort,
+                FullFrameChannelInterfaceFactory.INSTANCE);
         netManager.start();
 
         startApplication();
@@ -365,8 +366,28 @@ public class NodeControllerService implements 
IControllerService {
         return jobletMap;
     }
 
-    public Map<JobId, ActivityClusterGraph> getActivityClusterGraphMap() {
-        return activityClusterGraphMap;
+    public void storeActivityClusterGraph(JobId jobId, ActivityClusterGraph 
acg) throws HyracksException {
+        if (activityClusterGraphMap.get(jobId) != null) {
+            throw HyracksException.create(ErrorCode.DUPLICATE_DISTRIBUTED_JOB, 
jobId);
+        }
+        activityClusterGraphMap.put(jobId, acg);
+    }
+
+    public void removeActivityClusterGraph(JobId jobId) throws 
HyracksException {
+        if (activityClusterGraphMap.get(jobId) == null) {
+            throw 
HyracksException.create(ErrorCode.ERROR_FINDING_DISTRIBUTED_JOB, jobId);
+        }
+        activityClusterGraphMap.remove(jobId);
+    }
+
+    public void checkForDuplicateDistributedJob(JobId jobId) throws 
HyracksException {
+        if (activityClusterGraphMap.get(jobId) != null) {
+            throw HyracksException.create(ErrorCode.DUPLICATE_DISTRIBUTED_JOB, 
jobId);
+        }
+    }
+
+    public ActivityClusterGraph getActivityClusterGraph(JobId jobId) throws 
HyracksException {
+        return activityClusterGraphMap.get(jobId);
     }
 
     public NetworkManager getNetworkManager() {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34ee3335/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DestroyJobWork.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DestroyJobWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DestroyJobWork.java
new file mode 100644
index 0000000..55dd01e
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DestroyJobWork.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.control.nc.work;
+
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.control.common.work.AbstractWork;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+/**
+ * destroy a pre-distributed job
+ *
+ */
+public class DestroyJobWork extends AbstractWork {
+
+    private final NodeControllerService ncs;
+    private final JobId jobId;
+
+    public DestroyJobWork(NodeControllerService ncs, JobId jobId) {
+        this.ncs = ncs;
+        this.jobId = jobId;
+    }
+
+    @Override
+    public void run() {
+        try {
+            ncs.removeActivityClusterGraph(jobId);
+        } catch (HyracksException e) {
+            try {
+                ncs.getClusterController().notifyDistributedJobFailure(jobId, 
ncs.getId());
+            } catch (Exception e1) {
+                e1.printStackTrace();
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34ee3335/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DistributeJobWork.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DistributeJobWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DistributeJobWork.java
new file mode 100644
index 0000000..3a4f6ac
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DistributeJobWork.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.control.nc.work;
+
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.ActivityClusterGraph;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.control.common.deployment.DeploymentUtils;
+import org.apache.hyracks.control.common.work.AbstractWork;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+/**
+ * pre-distribute a job that can be executed later
+ *
+ */
+public class DistributeJobWork extends AbstractWork {
+
+    private final NodeControllerService ncs;
+    private final byte[] acgBytes;
+    private final JobId jobId;
+
+    public DistributeJobWork(NodeControllerService ncs, JobId jobId, byte[] 
acgBytes) {
+        this.ncs = ncs;
+        this.jobId = jobId;
+        this.acgBytes = acgBytes;
+    }
+
+    @Override
+    public void run() {
+        try {
+            ncs.checkForDuplicateDistributedJob(jobId);
+            ActivityClusterGraph acg =
+                    (ActivityClusterGraph) 
DeploymentUtils.deserialize(acgBytes, null, ncs.getApplicationContext());
+            ncs.storeActivityClusterGraph(jobId, acg);
+        } catch (HyracksException e) {
+            try {
+                ncs.getClusterController().notifyDistributedJobFailure(jobId, 
ncs.getId());
+            } catch (Exception e1) {
+                e1.printStackTrace();
+            }
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34ee3335/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
index 803f15a..6cd9fa2 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
@@ -46,6 +46,7 @@ import 
org.apache.hyracks.api.dataflow.connectors.IConnectorPolicy;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.deployment.DeploymentId;
+import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.job.ActivityCluster;
@@ -186,17 +187,12 @@ public class StartTasksWork extends AbstractWork {
         Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
         Joblet ji = jobletMap.get(jobId);
         if (ji == null) {
-            Map<JobId, ActivityClusterGraph> acgMap = 
ncs.getActivityClusterGraphMap();
-            ActivityClusterGraph acg = acgMap.get(jobId);
+            ActivityClusterGraph acg = ncs.getActivityClusterGraph(jobId);
             if (acg == null) {
                 if (acgBytes == null) {
-                    throw new HyracksException("Joblet was not found. This job 
was most likely aborted.");
+                    throw 
HyracksException.create(ErrorCode.ERROR_FINDING_DISTRIBUTED_JOB, jobId);
                 }
                 acg = (ActivityClusterGraph) 
DeploymentUtils.deserialize(acgBytes, deploymentId, appCtx);
-                if (flags.contains(JobFlag.STORE_JOB)) {
-                    //TODO: Right now the map is append-only
-                    acgMap.put(jobId, acg);
-                }
             }
             ji = new Joblet(ncs, deploymentId, jobId, appCtx, acg);
             jobletMap.put(jobId, ji);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34ee3335/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java
index b51a578..a7677f8 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java
@@ -63,10 +63,10 @@ public abstract class AbstractIntegrationTest {
     public static final String NC1_ID = "nc1";
     public static final String NC2_ID = "nc2";
 
-    private static ClusterControllerService cc;
+    protected static ClusterControllerService cc;
     protected static NodeControllerService nc1;
     protected static NodeControllerService nc2;
-    private static IHyracksClientConnection hcc;
+    protected static IHyracksClientConnection hcc;
 
     private final List<File> outputFiles;
     private static AtomicInteger aInteger = new AtomicInteger(0);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34ee3335/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java
index efbb9d2..160336a 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java
@@ -114,6 +114,11 @@ public class HeapSortMergeTest extends 
AbstractIntegrationTest {
 
     @Test
     public void optimizedSortMergeTest02() throws Exception {
+        JobSpecification spec = createSortMergeJobSpec();
+        runTest(spec);
+    }
+
+    public static JobSpecification createSortMergeJobSpec() throws Exception {
         JobSpecification spec = new JobSpecification();
 
         FileSplit[] ordersSplits = new FileSplit[] {
@@ -156,19 +161,17 @@ public class HeapSortMergeTest extends 
AbstractIntegrationTest {
 
         spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, 
sorter, 0);
 
-        spec.connect(
-                new MToNPartitioningMergingConnectorDescriptor(spec, new 
FieldHashPartitionComputerFactory(new int[] {
-                        1, 0 }, new IBinaryHashFunctionFactory[] {
-                        
PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
-                        
PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }), new 
int[] { 1, 0 },
-                        new IBinaryComparatorFactory[] {
-                                
PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
-                                
PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                        new UTF8StringNormalizedKeyComputerFactory()), sorter, 
0, filter, 0);
+        spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, new 
FieldHashPartitionComputerFactory(
+                new int[] { 1, 0 },
+                new IBinaryHashFunctionFactory[] { 
PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
+                        
PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }),
+                new int[] { 1, 0 },
+                new IBinaryComparatorFactory[] { 
PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
+                        
PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                new UTF8StringNormalizedKeyComputerFactory()), sorter, 0, 
filter, 0);
 
         spec.connect(new OneToOneConnectorDescriptor(spec), filter, 0, 
printer, 0);
-
-        runTest(spec);
+        return spec;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34ee3335/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java
new file mode 100644
index 0000000..2509515
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.tests.integration;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.verify;
+
+import java.io.File;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hyracks.api.client.HyracksConnection;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.common.controllers.CCConfig;
+import org.apache.hyracks.control.common.controllers.NCConfig;
+import org.apache.hyracks.control.nc.NodeControllerService;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class PredistributedJobsTest {
+    private static final Logger LOGGER = 
Logger.getLogger(PredistributedJobsTest.class.getName());
+
+    private static final String NC1_ID = "nc1";
+    private static final String NC2_ID = "nc2";
+
+    private static ClusterControllerService cc;
+    private static NodeControllerService nc1;
+    private static NodeControllerService nc2;
+    private static IHyracksClientConnection hcc;
+
+    @BeforeClass
+    public static void init() throws Exception {
+        CCConfig ccConfig = new CCConfig();
+        ccConfig.clientNetIpAddress = "127.0.0.1";
+        ccConfig.clientNetPort = 39000;
+        ccConfig.clusterNetIpAddress = "127.0.0.1";
+        ccConfig.clusterNetPort = 39001;
+        ccConfig.profileDumpPeriod = 10000;
+        FileUtils.deleteQuietly(new File("target" + File.separator + "data"));
+        FileUtils.copyDirectory(new File("data"), new File("target" + 
File.separator + "data"));
+        File outDir = new File("target" + File.separator + 
"ClusterController");
+        outDir.mkdirs();
+        File ccRoot = 
File.createTempFile(AbstractIntegrationTest.class.getName(), ".data", outDir);
+        ccRoot.delete();
+        ccRoot.mkdir();
+        ccConfig.ccRoot = ccRoot.getAbsolutePath();
+        ClusterControllerService ccBase = new 
ClusterControllerService(ccConfig);
+        cc = Mockito.spy(ccBase);
+        cc.start();
+
+        NCConfig ncConfig1 = new NCConfig();
+        ncConfig1.ccHost = "localhost";
+        ncConfig1.ccPort = 39001;
+        ncConfig1.clusterNetIPAddress = "127.0.0.1";
+        ncConfig1.dataIPAddress = "127.0.0.1";
+        ncConfig1.resultIPAddress = "127.0.0.1";
+        ncConfig1.nodeId = NC1_ID;
+        ncConfig1.ioDevices = System.getProperty("user.dir") + File.separator 
+ "target" + File.separator + "data"
+                + File.separator + "device0";
+        NodeControllerService nc1Base = new NodeControllerService(ncConfig1);
+        nc1 = Mockito.spy(nc1Base);
+        nc1.start();
+
+        NCConfig ncConfig2 = new NCConfig();
+        ncConfig2.ccHost = "localhost";
+        ncConfig2.ccPort = 39001;
+        ncConfig2.clusterNetIPAddress = "127.0.0.1";
+        ncConfig2.dataIPAddress = "127.0.0.1";
+        ncConfig2.resultIPAddress = "127.0.0.1";
+        ncConfig2.nodeId = NC2_ID;
+        ncConfig2.ioDevices = System.getProperty("user.dir") + File.separator 
+ "target" + File.separator + "data"
+                + File.separator + "device1";
+        NodeControllerService nc2Base = new NodeControllerService(ncConfig2);
+        nc2 = Mockito.spy(nc2Base);
+        nc2.start();
+
+        hcc = new HyracksConnection(ccConfig.clientNetIpAddress, 
ccConfig.clientNetPort);
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Starting CC in " + ccRoot.getAbsolutePath());
+        }
+    }
+
+    @Test
+    public void DistributedTest() throws Exception {
+        JobSpecification spec1 = UnionTest.createUnionJobSpec();
+        JobSpecification spec2 = HeapSortMergeTest.createSortMergeJobSpec();
+
+        //distribute both jobs
+        JobId jobId1 = hcc.distributeJob(spec1);
+        JobId jobId2 = hcc.distributeJob(spec2);
+
+        //make sure it finished
+        //cc will get the store once to check for duplicate insertion and once 
to insert per job
+        verify(cc, Mockito.timeout(5000).times(4)).getPreDistributedJobStore();
+        verify(nc1, 
Mockito.timeout(5000).times(2)).storeActivityClusterGraph(any(), any());
+        verify(nc2, 
Mockito.timeout(5000).times(2)).storeActivityClusterGraph(any(), any());
+        verify(nc1, 
Mockito.timeout(5000).times(2)).checkForDuplicateDistributedJob(any());
+        verify(nc2, 
Mockito.timeout(5000).times(2)).checkForDuplicateDistributedJob(any());
+
+        //confirm that both jobs are distributed
+        Assert.assertTrue(nc1.getActivityClusterGraph(jobId1) != null && 
nc2.getActivityClusterGraph(jobId1) != null);
+        Assert.assertTrue(nc1.getActivityClusterGraph(jobId2) != null && 
nc2.getActivityClusterGraph(jobId2) != null);
+        
Assert.assertTrue(cc.getPreDistributedJobStore().getDistributedJobDescriptor(jobId1)
 != null);
+        
Assert.assertTrue(cc.getPreDistributedJobStore().getDistributedJobDescriptor(jobId2)
 != null);
+
+        //run the first job
+        hcc.startJob(jobId1);
+        hcc.waitForCompletion(jobId1);
+
+        //destroy the first job
+        hcc.destroyJob(jobId1);
+
+        //make sure it finished
+        verify(cc, Mockito.timeout(5000).times(8)).getPreDistributedJobStore();
+        verify(nc1, 
Mockito.timeout(5000).times(1)).removeActivityClusterGraph(any());
+        verify(nc2, 
Mockito.timeout(5000).times(1)).removeActivityClusterGraph(any());
+
+        //confirm the first job is destroyed
+        Assert.assertTrue(nc1.getActivityClusterGraph(jobId1) == null && 
nc2.getActivityClusterGraph(jobId1) == null);
+        
cc.getPreDistributedJobStore().checkForExistingDistributedJobDescriptor(jobId1);
+
+        //run the second job
+        hcc.startJob(jobId2);
+        hcc.waitForCompletion(jobId2);
+
+        //run the second job again
+        hcc.startJob(jobId2);
+        hcc.waitForCompletion(jobId2);
+
+        //destroy the second job
+        hcc.destroyJob(jobId2);
+
+        //make sure it finished
+        verify(cc, 
Mockito.timeout(5000).times(12)).getPreDistributedJobStore();
+        verify(nc1, 
Mockito.timeout(5000).times(2)).removeActivityClusterGraph(any());
+        verify(nc2, 
Mockito.timeout(5000).times(2)).removeActivityClusterGraph(any());
+
+        //confirm the second job is destroyed
+        Assert.assertTrue(nc1.getActivityClusterGraph(jobId2) == null && 
nc2.getActivityClusterGraph(jobId2) == null);
+        
cc.getPreDistributedJobStore().checkForExistingDistributedJobDescriptor(jobId2);
+    }
+
+    @AfterClass
+    public static void deinit() throws Exception {
+        nc2.stop();
+        nc1.stop();
+        cc.stop();
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/34ee3335/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/UnionTest.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/UnionTest.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/UnionTest.java
index 02cab8f..542f037 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/UnionTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/UnionTest.java
@@ -44,6 +44,11 @@ import org.junit.Test;
 public class UnionTest extends AbstractIntegrationTest {
     @Test
     public void union01() throws Exception {
+        JobSpecification spec = createUnionJobSpec();
+        runTest(spec);
+    }
+
+    public static JobSpecification createUnionJobSpec() throws Exception {
         JobSpecification spec = new JobSpecification();
 
         IFileSplitProvider splitProvider = new ConstantFileSplitProvider(new 
FileSplit[] {
@@ -82,6 +87,6 @@ public class UnionTest extends AbstractIntegrationTest {
         spec.connect(new OneToOneConnectorDescriptor(spec), unionAll, 0, 
printer, 0);
 
         spec.addRoot(printer);
-        runTest(spec);
+        return spec;
     }
 }

Reply via email to