Repository: asterixdb
Updated Branches:
  refs/heads/master 592af6545 -> e6f426b80


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
index 4797ed7..77c352e 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
@@ -49,6 +49,7 @@ import 
org.apache.hyracks.api.dataflow.connectors.ConnectorPolicyFactory;
 import org.apache.hyracks.api.dataflow.connectors.IConnectorPolicy;
 import org.apache.hyracks.api.dataset.ResultSetId;
 import org.apache.hyracks.api.deployment.DeploymentId;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
 import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobStatus;
@@ -102,7 +103,7 @@ public class CCNCFunctions {
 
         DISTRIBUTE_JOB,
         DESTROY_JOB,
-        DISTRIBUTED_JOB_FAILURE,
+        DEPLOYED_JOB_FAILURE,
 
         STATE_DUMP_REQUEST,
         STATE_DUMP_RESPONSE,
@@ -286,24 +287,24 @@ public class CCNCFunctions {
         }
     }
 
-    public static class ReportDistributedJobFailureFunction extends Function {
+    public static class ReportDeployedJobSpecFailureFunction extends Function {
         private static final long serialVersionUID = 1L;
 
-        private final JobId jobId;
+        private final DeployedJobSpecId deployedJobSpecId;
         private final String nodeId;
 
-        public ReportDistributedJobFailureFunction(JobId jobId, String nodeId) 
{
-            this.jobId = jobId;
+        public ReportDeployedJobSpecFailureFunction(DeployedJobSpecId 
deployedJobSpecId, String nodeId) {
+            this.deployedJobSpecId = deployedJobSpecId;
             this.nodeId = nodeId;
         }
 
         @Override
         public FunctionId getFunctionId() {
-            return FunctionId.DISTRIBUTED_JOB_FAILURE;
+            return FunctionId.DEPLOYED_JOB_FAILURE;
         }
 
-        public JobId getJobId() {
-            return jobId;
+        public DeployedJobSpecId getDeployedJobSpecId() {
+            return deployedJobSpecId;
         }
 
         public String getNodeId() {
@@ -676,15 +677,15 @@ public class CCNCFunctions {
         }
     }
 
-    public static class DistributeJobFunction extends Function {
+    public static class DeployJobSpecFunction extends Function {
         private static final long serialVersionUID = 1L;
 
-        private final JobId jobId;
+        private final DeployedJobSpecId deployedJobSpecId;
 
         private final byte[] acgBytes;
 
-        public DistributeJobFunction(JobId jobId, byte[] acgBytes) {
-            this.jobId = jobId;
+        public DeployJobSpecFunction(DeployedJobSpecId deployedJobSpecId, 
byte[] acgBytes) {
+            this.deployedJobSpecId = deployedJobSpecId;
             this.acgBytes = acgBytes;
         }
 
@@ -693,8 +694,8 @@ public class CCNCFunctions {
             return FunctionId.DISTRIBUTE_JOB;
         }
 
-        public JobId getJobId() {
-            return jobId;
+        public DeployedJobSpecId getDeployedJobSpecId() {
+            return deployedJobSpecId;
         }
 
         public byte[] getacgBytes() {
@@ -702,13 +703,13 @@ public class CCNCFunctions {
         }
     }
 
-    public static class DestroyJobFunction extends Function {
+    public static class UndeployJobSpecFunction extends Function {
         private static final long serialVersionUID = 1L;
 
-        private final JobId jobId;
+        private final DeployedJobSpecId deployedJobSpecId;
 
-        public DestroyJobFunction(JobId jobId) {
-            this.jobId = jobId;
+        public UndeployJobSpecFunction(DeployedJobSpecId deployedJobSpecId) {
+            this.deployedJobSpecId = deployedJobSpecId;
         }
 
         @Override
@@ -716,8 +717,8 @@ public class CCNCFunctions {
             return FunctionId.DESTROY_JOB;
         }
 
-        public JobId getJobId() {
-            return jobId;
+        public DeployedJobSpecId getDeployedJobSpecId() {
+            return deployedJobSpecId;
         }
     }
 
@@ -730,16 +731,21 @@ public class CCNCFunctions {
         private final List<TaskAttemptDescriptor> taskDescriptors;
         private final Map<ConnectorDescriptorId, IConnectorPolicy> 
connectorPolicies;
         private final Set<JobFlag> flags;
+        private final Map<byte[], byte[]> jobParameters;
+        private final DeployedJobSpecId deployedJobSpecId;
 
         public StartTasksFunction(DeploymentId deploymentId, JobId jobId, 
byte[] planBytes,
                 List<TaskAttemptDescriptor> taskDescriptors,
-                Map<ConnectorDescriptorId, IConnectorPolicy> 
connectorPolicies, Set<JobFlag> flags) {
+                Map<ConnectorDescriptorId, IConnectorPolicy> 
connectorPolicies, Set<JobFlag> flags,
+                Map<byte[], byte[]> jobParameters, DeployedJobSpecId 
deployedJobSpecId) {
             this.deploymentId = deploymentId;
             this.jobId = jobId;
             this.planBytes = planBytes;
             this.taskDescriptors = taskDescriptors;
             this.connectorPolicies = connectorPolicies;
             this.flags = flags;
+            this.jobParameters = jobParameters;
+            this.deployedJobSpecId = deployedJobSpecId;
         }
 
         @Override
@@ -751,10 +757,18 @@ public class CCNCFunctions {
             return deploymentId;
         }
 
+        public DeployedJobSpecId getDeployedJobSpecId() {
+            return deployedJobSpecId;
+        }
+
         public JobId getJobId() {
             return jobId;
         }
 
+        public Map<byte[], byte[]> getJobParameters() {
+            return jobParameters;
+        }
+
         public byte[] getPlanBytes() {
             return planBytes;
         }
@@ -815,7 +829,33 @@ public class CCNCFunctions {
                 flags.add(JobFlag.values()[(dis.readInt())]);
             }
 
-            return new StartTasksFunction(deploymentId, jobId, planBytes, 
taskDescriptors, connectorPolicies, flags);
+            // read job parameters
+            int paramListSize = dis.readInt();
+            Map<byte[], byte[]> jobParameters = new HashMap<>();
+            for (int i = 0; i < paramListSize; i++) {
+                int nameLength = dis.readInt();
+                byte[] nameBytes = null;
+                if (nameLength >= 0) {
+                    nameBytes = new byte[nameLength];
+                    dis.read(nameBytes, 0, nameLength);
+                }
+                int valueLength = dis.readInt();
+                byte[] valueBytes = null;
+                if (valueLength >= 0) {
+                    valueBytes = new byte[valueLength];
+                    dis.read(valueBytes, 0, valueLength);
+                }
+                jobParameters.put(nameBytes, valueBytes);
+            }
+
+            //read DeployedJobSpecId
+            DeployedJobSpecId deployedJobSpecId = null;
+            if (dis.readBoolean()) {
+                deployedJobSpecId = DeployedJobSpecId.create(dis);
+            }
+
+            return new StartTasksFunction(deploymentId, jobId, planBytes, 
taskDescriptors, connectorPolicies, flags,
+                    jobParameters, deployedJobSpecId);
         }
 
         public static void serialize(OutputStream out, Object object) throws 
Exception {
@@ -853,6 +893,22 @@ public class CCNCFunctions {
             for (JobFlag flag : fn.flags) {
                 dos.writeInt(flag.ordinal());
             }
+
+            //write job parameters
+            dos.writeInt(fn.jobParameters.size());
+            for (Entry<byte[], byte[]> entry : fn.jobParameters.entrySet()) {
+                dos.writeInt(entry.getKey().length);
+                dos.write(entry.getKey(), 0, entry.getKey().length);
+                dos.writeInt(entry.getValue().length);
+                dos.write(entry.getValue(), 0, entry.getValue().length);
+            }
+
+            //write deployed job spec id
+            dos.writeBoolean(fn.getDeployedJobSpecId() == null ? false : true);
+            if (fn.getDeployedJobSpecId() != null) {
+                fn.getDeployedJobSpecId().writeFields(dos);
+            }
+
         }
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index c09f317..f9af4c6 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -18,8 +18,6 @@
  */
 package org.apache.hyracks.control.common.ipc;
 
-import static org.apache.hyracks.control.common.ipc.CCNCFunctions.*;
-
 import java.net.InetSocketAddress;
 import java.util.List;
 import java.util.logging.Logger;
@@ -28,11 +26,30 @@ import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.dataflow.TaskAttemptId;
 import org.apache.hyracks.api.dataset.ResultSetId;
 import org.apache.hyracks.api.deployment.DeploymentId;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.control.common.base.IClusterController;
 import org.apache.hyracks.control.common.controllers.NodeRegistration;
 import org.apache.hyracks.control.common.deployment.DeploymentStatus;
 import org.apache.hyracks.control.common.heartbeat.HeartbeatData;
+import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.GetNodeControllersInfoFunction;
+import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.NodeHeartbeatFunction;
+import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.NotifyDeployBinaryFunction;
+import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.NotifyJobletCleanupFunction;
+import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.NotifyTaskCompleteFunction;
+import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.NotifyTaskFailureFunction;
+import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.RegisterNodeFunction;
+import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.RegisterPartitionProviderFunction;
+import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.RegisterPartitionRequestFunction;
+import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.RegisterResultPartitionLocationFunction;
+import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.ReportDeployedJobSpecFailureFunction;
+import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.ReportProfileFunction;
+import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.ReportResultPartitionWriteCompletionFunction;
+import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.SendApplicationMessageFunction;
+import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.ShutdownResponseFunction;
+import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.StateDumpResponseFunction;
+import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.ThreadDumpResponseFunction;
+import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.UnregisterNodeFunction;
 import org.apache.hyracks.control.common.job.PartitionDescriptor;
 import org.apache.hyracks.control.common.job.PartitionRequest;
 import org.apache.hyracks.control.common.job.profiling.om.JobProfile;
@@ -151,9 +168,8 @@ public class ClusterControllerRemoteProxy extends 
ControllerRemoteProxy implemen
     }
 
     @Override
-    public void notifyDistributedJobFailure(JobId jobId, String nodeId) throws 
Exception {
-        ReportDistributedJobFailureFunction fn = new 
ReportDistributedJobFailureFunction(
-                jobId, nodeId);
+    public void notifyDeployedJobSpecFailure(DeployedJobSpecId 
deployedJobSpecId, String nodeId) throws Exception {
+        ReportDeployedJobSpecFailureFunction fn = new 
ReportDeployedJobSpecFailureFunction(deployedJobSpecId, nodeId);
         ensureIpcHandle().send(-1, fn, null);
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index 3819ef8..8431eca 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -18,8 +18,6 @@
  */
 package org.apache.hyracks.control.common.ipc;
 
-import static org.apache.hyracks.control.common.ipc.CCNCFunctions.*;
-
 import java.net.InetSocketAddress;
 import java.net.URL;
 import java.util.List;
@@ -32,11 +30,24 @@ import 
org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
 import org.apache.hyracks.api.dataflow.TaskAttemptId;
 import org.apache.hyracks.api.dataflow.connectors.IConnectorPolicy;
 import org.apache.hyracks.api.deployment.DeploymentId;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
 import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobStatus;
 import org.apache.hyracks.api.partitions.PartitionId;
 import org.apache.hyracks.control.common.base.INodeController;
+import org.apache.hyracks.control.common.ipc.CCNCFunctions.AbortTasksFunction;
+import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.CleanupJobletFunction;
+import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.DeployBinaryFunction;
+import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.DeployJobSpecFunction;
+import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.ReportPartitionAvailabilityFunction;
+import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.SendApplicationMessageFunction;
+import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.ShutdownRequestFunction;
+import org.apache.hyracks.control.common.ipc.CCNCFunctions.StartTasksFunction;
+import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.StateDumpRequestFunction;
+import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.ThreadDumpRequestFunction;
+import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.UnDeployBinaryFunction;
+import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.UndeployJobSpecFunction;
 import org.apache.hyracks.control.common.job.TaskAttemptDescriptor;
 import org.apache.hyracks.ipc.impl.IPCSystem;
 
@@ -61,9 +72,10 @@ public class NodeControllerRemoteProxy extends 
ControllerRemoteProxy implements
     @Override
     public void startTasks(DeploymentId deploymentId, JobId jobId, byte[] 
planBytes,
             List<TaskAttemptDescriptor> taskDescriptors, 
Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies,
-            Set<JobFlag> flags) throws Exception {
+            Set<JobFlag> flags, Map<byte[], byte[]> jobParameters, 
DeployedJobSpecId deployedJobSpecId)
+            throws Exception {
         StartTasksFunction stf = new StartTasksFunction(deploymentId, jobId, 
planBytes,
-                taskDescriptors, connectorPolicies, flags);
+                taskDescriptors, connectorPolicies, flags, jobParameters, 
deployedJobSpecId);
         ensureIpcHandle().send(-1, stf, null);
     }
 
@@ -99,14 +111,14 @@ public class NodeControllerRemoteProxy extends 
ControllerRemoteProxy implements
     }
 
     @Override
-    public void distributeJob(JobId jobId, byte[] planBytes) throws Exception {
-        DistributeJobFunction fn = new DistributeJobFunction(jobId, planBytes);
+    public void deployJobSpec(DeployedJobSpecId deployedJobSpecId, byte[] 
planBytes) throws Exception {
+        DeployJobSpecFunction fn = new 
DeployJobSpecFunction(deployedJobSpecId, planBytes);
         ensureIpcHandle().send(-1, fn, null);
     }
 
     @Override
-    public void destroyJob(JobId jobId) throws Exception {
-        DestroyJobFunction fn = new DestroyJobFunction(jobId);
+    public void undeployJobSpec(DeployedJobSpecId deployedJobSpecId) throws 
Exception {
+        UndeployJobSpecFunction fn = new 
UndeployJobSpecFunction(deployedJobSpecId);
         ensureIpcHandle().send(-1, fn, null);
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
index 66a5e0f..ce666b0 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
@@ -100,8 +100,11 @@ public class Joblet implements IHyracksJobletContext, 
ICounterContext {
 
     private boolean cleanupPending;
 
+    private final IJobletEventListenerFactory jobletEventListenerFactory;
+
     public Joblet(NodeControllerService nodeController, DeploymentId 
deploymentId, JobId jobId,
-            INCServiceContext serviceCtx, ActivityClusterGraph acg) {
+            INCServiceContext serviceCtx, ActivityClusterGraph acg,
+            IJobletEventListenerFactory jobletEventListenerFactory) {
         this.nodeController = nodeController;
         this.serviceCtx = serviceCtx;
         this.deploymentId = deploymentId;
@@ -117,9 +120,9 @@ public class Joblet implements IHyracksJobletContext, 
ICounterContext {
         deallocatableRegistry = new DefaultDeallocatableRegistry();
         fileFactory = new WorkspaceFileFactory(this, 
serviceCtx.getIoManager());
         cleanupPending = false;
-        IJobletEventListenerFactory jelf = acg.getJobletEventListenerFactory();
-        if (jelf != null) {
-            IJobletEventListener listener = jelf.createListener(this);
+        this.jobletEventListenerFactory = jobletEventListenerFactory;
+        if (jobletEventListenerFactory != null) {
+            IJobletEventListener listener = 
jobletEventListenerFactory.createListener(this);
             this.jobletEventListener = listener;
             listener.jobletStart();
         } else {
@@ -134,6 +137,11 @@ public class Joblet implements IHyracksJobletContext, 
ICounterContext {
         return jobId;
     }
 
+    @Override
+    public IJobletEventListenerFactory getJobletEventListenerFactory() {
+        return jobletEventListenerFactory;
+    }
+
     public ActivityClusterGraph getActivityClusterGraph() {
         return acg;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
index 1eb1393..c54f153 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
@@ -27,12 +27,12 @@ import org.apache.hyracks.control.nc.work.AbortTasksWork;
 import org.apache.hyracks.control.nc.work.ApplicationMessageWork;
 import org.apache.hyracks.control.nc.work.CleanupJobletWork;
 import org.apache.hyracks.control.nc.work.DeployBinaryWork;
-import org.apache.hyracks.control.nc.work.DestroyJobWork;
-import org.apache.hyracks.control.nc.work.DistributeJobWork;
+import org.apache.hyracks.control.nc.work.DeployJobSpecWork;
 import org.apache.hyracks.control.nc.work.ReportPartitionAvailabilityWork;
 import org.apache.hyracks.control.nc.work.StartTasksWork;
 import org.apache.hyracks.control.nc.work.StateDumpWork;
 import org.apache.hyracks.control.nc.work.UnDeployBinaryWork;
+import org.apache.hyracks.control.nc.work.UndeployJobSpecWork;
 import org.apache.hyracks.ipc.api.IIPCHandle;
 import org.apache.hyracks.ipc.api.IIPCI;
 
@@ -62,8 +62,10 @@ final class NodeControllerIPCI implements IIPCI {
                 return;
             case START_TASKS:
                 CCNCFunctions.StartTasksFunction stf = 
(CCNCFunctions.StartTasksFunction) fn;
-                ncs.getWorkQueue().schedule(new StartTasksWork(ncs, 
stf.getDeploymentId(), stf.getJobId(),
-                        stf.getPlanBytes(), stf.getTaskDescriptors(), 
stf.getConnectorPolicies(), stf.getFlags()));
+                ncs.getWorkQueue()
+                        .schedule(new StartTasksWork(ncs, 
stf.getDeploymentId(), stf.getJobId(), stf.getPlanBytes(),
+                                stf.getTaskDescriptors(), 
stf.getConnectorPolicies(), stf.getFlags(),
+                                stf.getJobParameters(), 
stf.getDeployedJobSpecId()));
                 return;
             case ABORT_TASKS:
                 CCNCFunctions.AbortTasksFunction atf = 
(CCNCFunctions.AbortTasksFunction) fn;
@@ -104,13 +106,13 @@ final class NodeControllerIPCI implements IIPCI {
                 return;
 
             case DISTRIBUTE_JOB:
-                CCNCFunctions.DistributeJobFunction djf = 
(CCNCFunctions.DistributeJobFunction) fn;
-                ncs.getWorkQueue().schedule(new DistributeJobWork(ncs, 
djf.getJobId(), djf.getacgBytes()));
+                CCNCFunctions.DeployJobSpecFunction djf = 
(CCNCFunctions.DeployJobSpecFunction) fn;
+                ncs.getWorkQueue().schedule(new DeployJobSpecWork(ncs, 
djf.getDeployedJobSpecId(), djf.getacgBytes()));
                 return;
 
             case DESTROY_JOB:
-                CCNCFunctions.DestroyJobFunction dsjf = 
(CCNCFunctions.DestroyJobFunction) fn;
-                ncs.getWorkQueue().schedule(new DestroyJobWork(ncs, 
dsjf.getJobId()));
+                CCNCFunctions.UndeployJobSpecFunction dsjf = 
(CCNCFunctions.UndeployJobSpecFunction) fn;
+                ncs.getWorkQueue().schedule(new UndeployJobSpecWork(ncs, 
dsjf.getDeployedJobSpecId()));
                 return;
 
             case STATE_DUMP_REQUEST:

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 7a01b87..9dd9536 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -29,6 +29,7 @@ import java.lang.management.RuntimeMXBean;
 import java.lang.management.ThreadMXBean;
 import java.net.InetSocketAddress;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.Hashtable;
 import java.util.List;
 import java.util.Map;
@@ -53,7 +54,9 @@ import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.io.IODeviceHandle;
 import org.apache.hyracks.api.job.ActivityClusterGraph;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
 import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobParameterByteStore;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
 import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager;
 import org.apache.hyracks.api.service.IControllerService;
@@ -128,7 +131,9 @@ public class NodeControllerService implements 
IControllerService {
 
     private final Map<JobId, Joblet> jobletMap;
 
-    private final Map<JobId, ActivityClusterGraph> preDistributedJobs;
+    private final Map<Long, ActivityClusterGraph> 
deployedJobSpecActivityClusterGraphMap;
+
+    private final Map<JobId, JobParameterByteStore> jobParameterByteStoreMap = 
new HashMap<>();
 
     private ExecutorService executor;
 
@@ -202,7 +207,7 @@ public class NodeControllerService implements 
IControllerService {
 
         workQueue = new WorkQueue(id, Thread.NORM_PRIORITY); // Reserves 
MAX_PRIORITY of the heartbeat thread.
         jobletMap = new Hashtable<>();
-        preDistributedJobs = new Hashtable<>();
+        deployedJobSpecActivityClusterGraphMap = new Hashtable<>();
         timer = new Timer(true);
         serverCtx = new ServerContext(ServerContext.ServerType.NODE_CONTROLLER,
                 new File(new File(NodeControllerService.class.getName()), id));
@@ -423,28 +428,43 @@ public class NodeControllerService implements 
IControllerService {
         return jobletMap;
     }
 
-    public void storeActivityClusterGraph(JobId jobId, ActivityClusterGraph 
acg) throws HyracksException {
-        if (preDistributedJobs.get(jobId) != null) {
-            throw HyracksException.create(ErrorCode.DUPLICATE_DISTRIBUTED_JOB, 
jobId);
+    public void removeJobParameterByteStore(JobId jobId) {
+        jobParameterByteStoreMap.remove(jobId);
+    }
+
+    public JobParameterByteStore createOrGetJobParameterByteStore(JobId jobId) 
throws HyracksException {
+        JobParameterByteStore jpbs = jobParameterByteStoreMap.get(jobId);
+        if (jpbs == null) {
+            jpbs = new JobParameterByteStore();
+            jobParameterByteStoreMap.put(jobId, jpbs);
+        }
+        return jpbs;
+    }
+
+
+    public void storeActivityClusterGraph(DeployedJobSpecId deployedJobSpecId, 
ActivityClusterGraph acg)
+            throws HyracksException {
+        if 
(deployedJobSpecActivityClusterGraphMap.get(deployedJobSpecId.getId()) != null) 
{
+            throw HyracksException.create(ErrorCode.DUPLICATE_DEPLOYED_JOB, 
deployedJobSpecId);
         }
-        preDistributedJobs.put(jobId, acg);
+        deployedJobSpecActivityClusterGraphMap.put(deployedJobSpecId.getId(), 
acg);
     }
 
-    public void removeActivityClusterGraph(JobId jobId) throws 
HyracksException {
-        if (preDistributedJobs.get(jobId) == null) {
-            throw 
HyracksException.create(ErrorCode.ERROR_FINDING_DISTRIBUTED_JOB, jobId);
+    public void removeActivityClusterGraph(DeployedJobSpecId 
deployedJobSpecId) throws HyracksException {
+        if 
(deployedJobSpecActivityClusterGraphMap.get(deployedJobSpecId.getId()) == null) 
{
+            throw 
HyracksException.create(ErrorCode.ERROR_FINDING_DEPLOYED_JOB, 
deployedJobSpecId);
         }
-        preDistributedJobs.remove(jobId);
+        
deployedJobSpecActivityClusterGraphMap.remove(deployedJobSpecId.getId());
     }
 
-    public void checkForDuplicateDistributedJob(JobId jobId) throws 
HyracksException {
-        if (preDistributedJobs.get(jobId) != null) {
-            throw HyracksException.create(ErrorCode.DUPLICATE_DISTRIBUTED_JOB, 
jobId);
+    public void checkForDuplicateDeployedJobSpec(DeployedJobSpecId 
deployedJobSpecId) throws HyracksException {
+        if 
(deployedJobSpecActivityClusterGraphMap.get(deployedJobSpecId.getId()) != null) 
{
+            throw HyracksException.create(ErrorCode.DUPLICATE_DEPLOYED_JOB, 
deployedJobSpecId);
         }
     }
 
-    public ActivityClusterGraph getActivityClusterGraph(JobId jobId) throws 
HyracksException {
-        return preDistributedJobs.get(jobId);
+    public ActivityClusterGraph getActivityClusterGraph(DeployedJobSpecId 
deployedJobSpecId) throws HyracksException {
+        return 
deployedJobSpecActivityClusterGraphMap.get(deployedJobSpecId.getId());
     }
 
     public NetworkManager getNetworkManager() {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index fcd4bde..94ee92b 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -455,6 +455,9 @@ public class Task implements IHyracksTaskContext, 
ICounterContext, Runnable {
     }
 
     @Override
+    public byte[] getJobParameter(byte[] name, int start, int length) throws 
HyracksException {
+        return 
ncs.createOrGetJobParameterByteStore(joblet.getJobId()).getParameterValue(name, 
start, length);
+    }
     public Set<JobFlag> getJobFlags() {
         return jobFlags;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java
index 670ce06..03ae90c 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java
@@ -51,6 +51,7 @@ public class CleanupJobletWork extends AbstractWork {
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Cleaning up after job: " + jobId);
         }
+        ncs.removeJobParameterByteStore(jobId);
         final List<IPartition> unregisteredPartitions = new 
ArrayList<IPartition>();
         ncs.getPartitionManager().unregisterPartitions(jobId, 
unregisteredPartitions);
         ncs.getExecutor().execute(new Runnable() {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DeployJobSpecWork.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DeployJobSpecWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DeployJobSpecWork.java
new file mode 100644
index 0000000..4276b67
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DeployJobSpecWork.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.control.nc.work;
+
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.ActivityClusterGraph;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
+import org.apache.hyracks.control.common.deployment.DeploymentUtils;
+import org.apache.hyracks.control.common.work.AbstractWork;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+/**
+ * pre-distribute a job that can be executed later
+ *
+ */
+public class DeployJobSpecWork extends AbstractWork {
+
+    private final NodeControllerService ncs;
+    private final byte[] acgBytes;
+    private final DeployedJobSpecId deployedJobSpecId;
+
+    public DeployJobSpecWork(NodeControllerService ncs, DeployedJobSpecId 
deployedJobSpecId, byte[] acgBytes) {
+        this.ncs = ncs;
+        this.deployedJobSpecId = deployedJobSpecId;
+        this.acgBytes = acgBytes;
+    }
+
+    @Override
+    public void run() {
+        try {
+            ncs.checkForDuplicateDeployedJobSpec(deployedJobSpecId);
+            ActivityClusterGraph acg =
+                    (ActivityClusterGraph) 
DeploymentUtils.deserialize(acgBytes, null, ncs.getContext());
+            ncs.storeActivityClusterGraph(deployedJobSpecId, acg);
+        } catch (HyracksException e) {
+            try {
+                
ncs.getClusterController().notifyDeployedJobSpecFailure(deployedJobSpecId, 
ncs.getId());
+            } catch (Exception e1) {
+                e1.printStackTrace();
+            }
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DestroyJobWork.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DestroyJobWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DestroyJobWork.java
deleted file mode 100644
index 55dd01e..0000000
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DestroyJobWork.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hyracks.control.nc.work;
-
-import org.apache.hyracks.api.exceptions.HyracksException;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.control.common.work.AbstractWork;
-import org.apache.hyracks.control.nc.NodeControllerService;
-
-/**
- * destroy a pre-distributed job
- *
- */
-public class DestroyJobWork extends AbstractWork {
-
-    private final NodeControllerService ncs;
-    private final JobId jobId;
-
-    public DestroyJobWork(NodeControllerService ncs, JobId jobId) {
-        this.ncs = ncs;
-        this.jobId = jobId;
-    }
-
-    @Override
-    public void run() {
-        try {
-            ncs.removeActivityClusterGraph(jobId);
-        } catch (HyracksException e) {
-            try {
-                ncs.getClusterController().notifyDistributedJobFailure(jobId, 
ncs.getId());
-            } catch (Exception e1) {
-                e1.printStackTrace();
-            }
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DistributeJobWork.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DistributeJobWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DistributeJobWork.java
deleted file mode 100644
index 486a420..0000000
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DistributeJobWork.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hyracks.control.nc.work;
-
-import org.apache.hyracks.api.exceptions.HyracksException;
-import org.apache.hyracks.api.job.ActivityClusterGraph;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.control.common.deployment.DeploymentUtils;
-import org.apache.hyracks.control.common.work.AbstractWork;
-import org.apache.hyracks.control.nc.NodeControllerService;
-
-/**
- * pre-distribute a job that can be executed later
- *
- */
-public class DistributeJobWork extends AbstractWork {
-
-    private final NodeControllerService ncs;
-    private final byte[] acgBytes;
-    private final JobId jobId;
-
-    public DistributeJobWork(NodeControllerService ncs, JobId jobId, byte[] 
acgBytes) {
-        this.ncs = ncs;
-        this.jobId = jobId;
-        this.acgBytes = acgBytes;
-    }
-
-    @Override
-    public void run() {
-        try {
-            ncs.checkForDuplicateDistributedJob(jobId);
-            ncs.updateMaxJobId(jobId);
-            ActivityClusterGraph acg =
-                    (ActivityClusterGraph) 
DeploymentUtils.deserialize(acgBytes, null, ncs.getContext());
-            ncs.storeActivityClusterGraph(jobId, acg);
-        } catch (HyracksException e) {
-            try {
-                ncs.getClusterController().notifyDistributedJobFailure(jobId, 
ncs.getId());
-            } catch (Exception e1) {
-                e1.printStackTrace();
-            }
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
index c369781..a2fcc25 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
@@ -47,11 +47,12 @@ import 
org.apache.hyracks.api.dataflow.connectors.IConnectorPolicy;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.deployment.DeploymentId;
-import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.job.ActivityCluster;
 import org.apache.hyracks.api.job.ActivityClusterGraph;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
+import org.apache.hyracks.api.job.IJobletEventListenerFactory;
 import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.partitions.PartitionId;
@@ -79,6 +80,8 @@ public class StartTasksWork extends AbstractWork {
 
     private final JobId jobId;
 
+    private final DeployedJobSpecId deployedJobSpecId;
+
     private final byte[] acgBytes;
 
     private final List<TaskAttemptDescriptor> taskDescriptors;
@@ -87,16 +90,21 @@ public class StartTasksWork extends AbstractWork {
 
     private final Set<JobFlag> flags;
 
+    private final Map<byte[], byte[]> jobParameters;
+
     public StartTasksWork(NodeControllerService ncs, DeploymentId 
deploymentId, JobId jobId, byte[] acgBytes,
             List<TaskAttemptDescriptor> taskDescriptors,
-            Map<ConnectorDescriptorId, IConnectorPolicy> connectorPoliciesMap, 
Set<JobFlag> flags) {
+            Map<ConnectorDescriptorId, IConnectorPolicy> connectorPoliciesMap, 
Set<JobFlag> flags,
+            Map<byte[], byte[]> jobParameters, DeployedJobSpecId 
deployedJobSpecId) {
         this.ncs = ncs;
         this.deploymentId = deploymentId;
         this.jobId = jobId;
+        this.deployedJobSpecId = deployedJobSpecId;
         this.acgBytes = acgBytes;
         this.taskDescriptors = taskDescriptors;
         this.connectorPoliciesMap = connectorPoliciesMap;
         this.flags = flags;
+        this.jobParameters = jobParameters;
     }
 
     @Override
@@ -106,7 +114,7 @@ public class StartTasksWork extends AbstractWork {
         try {
             ncs.updateMaxJobId(jobId);
             NCServiceContext serviceCtx = ncs.getContext();
-            Joblet joblet = getOrCreateLocalJoblet(deploymentId, jobId, 
serviceCtx, acgBytes);
+            Joblet joblet = getOrCreateLocalJoblet(deploymentId, serviceCtx, 
acgBytes);
             final ActivityClusterGraph acg = joblet.getActivityClusterGraph();
             IRecordDescriptorProvider rdp = new IRecordDescriptorProvider() {
                 @Override
@@ -190,19 +198,22 @@ public class StartTasksWork extends AbstractWork {
         }
     }
 
-    private Joblet getOrCreateLocalJoblet(DeploymentId deploymentId, JobId 
jobId, INCServiceContext appCtx,
-            byte[] acgBytes) throws HyracksException {
+    private Joblet getOrCreateLocalJoblet(DeploymentId deploymentId, 
INCServiceContext appCtx, byte[] acgBytes)
+            throws HyracksException {
         Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
         Joblet ji = jobletMap.get(jobId);
         if (ji == null) {
-            ActivityClusterGraph acg = ncs.getActivityClusterGraph(jobId);
-            if (acg == null) {
-                if (acgBytes == null) {
-                    throw 
HyracksException.create(ErrorCode.ERROR_FINDING_DISTRIBUTED_JOB, jobId);
+            ActivityClusterGraph acg = (deployedJobSpecId != null) ? 
ncs.getActivityClusterGraph(deployedJobSpecId)
+                    : (ActivityClusterGraph) 
DeploymentUtils.deserialize(acgBytes, deploymentId, appCtx);
+            
ncs.createOrGetJobParameterByteStore(jobId).setParameters(jobParameters);
+            IJobletEventListenerFactory listenerFactory = 
acg.getJobletEventListenerFactory();
+            if (listenerFactory != null) {
+                if (deployedJobSpecId != null) {
+                    listenerFactory = 
acg.getJobletEventListenerFactory().copyFactory();
                 }
-                acg = (ActivityClusterGraph) 
DeploymentUtils.deserialize(acgBytes, deploymentId, appCtx);
+                
listenerFactory.updateListenerJobParameters(ncs.createOrGetJobParameterByteStore(jobId));
             }
-            ji = new Joblet(ncs, deploymentId, jobId, appCtx, acg);
+            ji = new Joblet(ncs, deploymentId, jobId, appCtx, acg, 
listenerFactory);
             jobletMap.put(jobId, ji);
         }
         return ji;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/UndeployJobSpecWork.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/UndeployJobSpecWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/UndeployJobSpecWork.java
new file mode 100644
index 0000000..4383ff6
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/UndeployJobSpecWork.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.control.nc.work;
+
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
+import org.apache.hyracks.control.common.work.AbstractWork;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+/**
+ * remove the deployed job
+ *
+ */
+public class UndeployJobSpecWork extends AbstractWork {
+
+    private final NodeControllerService ncs;
+    private final DeployedJobSpecId deployedJobSpecId;
+
+    public UndeployJobSpecWork(NodeControllerService ncs, DeployedJobSpecId 
deployedJobSpecId) {
+        this.ncs = ncs;
+        this.deployedJobSpecId = deployedJobSpecId;
+    }
+
+    @Override
+    public void run() {
+        try {
+            ncs.removeActivityClusterGraph(deployedJobSpecId);
+        } catch (HyracksException e) {
+            try {
+                
ncs.getClusterController().notifyDeployedJobSpecFailure(deployedJobSpecId, 
ncs.getId());
+            } catch (Exception e1) {
+                e1.printStackTrace();
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/DeployedJobSpecsTest.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/DeployedJobSpecsTest.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/DeployedJobSpecsTest.java
new file mode 100644
index 0000000..dd4fdd1
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/DeployedJobSpecsTest.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.tests.integration;
+
+import static org.apache.hyracks.util.file.FileUtil.joinPath;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.verify;
+
+import java.io.File;
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hyracks.api.client.HyracksConnection;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.cluster.INodeManager;
+import org.apache.hyracks.control.cc.cluster.NodeManager;
+import org.apache.hyracks.control.common.controllers.CCConfig;
+import org.apache.hyracks.control.common.controllers.NCConfig;
+import org.apache.hyracks.control.nc.NodeControllerService;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class DeployedJobSpecsTest {
+    private static final Logger LOGGER = 
Logger.getLogger(DeployedJobSpecsTest.class.getName());
+
+    private static final String NC1_ID = "nc1";
+    private static final String NC2_ID = "nc2";
+    private static final int TIME_THRESHOLD = 5000;
+
+    private static ClusterControllerService cc;
+    private static NodeControllerService nc1;
+    private static NodeControllerService nc2;
+    private static IHyracksClientConnection hcc;
+
+    @BeforeClass
+    public static void init() throws Exception {
+        CCConfig ccConfig = new CCConfig();
+        ccConfig.setClientListenAddress("127.0.0.1");
+        ccConfig.setClientListenPort(39000);
+        ccConfig.setClusterListenAddress("127.0.0.1");
+        ccConfig.setClusterListenPort(39001);
+        ccConfig.setProfileDumpPeriod(10000);
+        FileUtils.deleteQuietly(new File(joinPath("target", "data")));
+        FileUtils.copyDirectory(new File("data"), new File(joinPath("target", 
"data")));
+        File outDir = new File("target" + File.separator + 
"ClusterController");
+        outDir.mkdirs();
+        File ccRoot = 
File.createTempFile(AbstractIntegrationTest.class.getName(), ".data", outDir);
+        ccRoot.delete();
+        ccRoot.mkdir();
+        ccConfig.setRootDir(ccRoot.getAbsolutePath());
+        ClusterControllerService ccBase = new 
ClusterControllerService(ccConfig);
+        // The spying below is dangerous since it replaces the 
ClusterControllerService already referenced by many
+        // objects created in the constructor above
+        cc = Mockito.spy(ccBase);
+        cc.start();
+
+        // The following code partially fixes the problem created by the spying
+        INodeManager nodeManager = cc.getNodeManager();
+        Field ccsInNodeManager = NodeManager.class.getDeclaredField("ccs");
+        ccsInNodeManager.setAccessible(true);
+        ccsInNodeManager.set(nodeManager, cc);
+
+        NCConfig ncConfig1 = new NCConfig(NC1_ID);
+        ncConfig1.setClusterAddress("localhost");
+        ncConfig1.setClusterPort(39001);
+        ncConfig1.setClusterListenAddress("127.0.0.1");
+        ncConfig1.setDataListenAddress("127.0.0.1");
+        ncConfig1.setResultListenAddress("127.0.0.1");
+        ncConfig1.setResultSweepThreshold(TIME_THRESHOLD);
+        ncConfig1.setIODevices(new String[] { 
joinPath(System.getProperty("user.dir"), "target", "data", "device0") });
+        NodeControllerService nc1Base = new NodeControllerService(ncConfig1);
+        nc1 = Mockito.spy(nc1Base);
+        nc1.start();
+
+        NCConfig ncConfig2 = new NCConfig(NC2_ID);
+        ncConfig2.setClusterAddress("localhost");
+        ncConfig2.setClusterPort(39001);
+        ncConfig2.setClusterListenAddress("127.0.0.1");
+        ncConfig2.setDataListenAddress("127.0.0.1");
+        ncConfig2.setResultListenAddress("127.0.0.1");
+        ncConfig2.setResultSweepThreshold(TIME_THRESHOLD);
+        ncConfig2.setIODevices(new String[] { 
joinPath(System.getProperty("user.dir"), "target", "data", "device1") });
+        NodeControllerService nc2Base = new NodeControllerService(ncConfig2);
+        nc2 = Mockito.spy(nc2Base);
+        nc2.start();
+
+        hcc = new HyracksConnection(ccConfig.getClientListenAddress(), 
ccConfig.getClientListenPort());
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Starting CC in " + ccRoot.getAbsolutePath());
+        }
+    }
+
+    @Test
+    public void DistributedTest() throws Exception {
+        JobSpecification spec1 = UnionTest.createUnionJobSpec();
+        JobSpecification spec2 = HeapSortMergeTest.createSortMergeJobSpec();
+
+        //distribute both jobs
+        DeployedJobSpecId distributedId1 = hcc.deployJobSpec(spec1);
+        DeployedJobSpecId distributedId2 = hcc.deployJobSpec(spec2);
+
+        //make sure it finished
+        //cc will get the store once to check for duplicate insertion and once 
to insert per job
+        verify(cc, 
Mockito.timeout(TIME_THRESHOLD).times(4)).getDeployedJobSpecStore();
+        verify(nc1, 
Mockito.timeout(TIME_THRESHOLD).times(2)).storeActivityClusterGraph(any(), 
any());
+        verify(nc2, 
Mockito.timeout(TIME_THRESHOLD).times(2)).storeActivityClusterGraph(any(), 
any());
+        verify(nc1, 
Mockito.timeout(TIME_THRESHOLD).times(2)).checkForDuplicateDeployedJobSpec(any());
+        verify(nc2, 
Mockito.timeout(TIME_THRESHOLD).times(2)).checkForDuplicateDeployedJobSpec(any());
+
+        //confirm that both jobs are distributed
+        Assert.assertTrue(nc1.getActivityClusterGraph(distributedId1) != null 
&& nc2.getActivityClusterGraph(distributedId1) != null);
+        Assert.assertTrue(nc1.getActivityClusterGraph(distributedId2) != null 
&& nc2.getActivityClusterGraph(distributedId2) != null);
+        
Assert.assertTrue(cc.getDeployedJobSpecStore().getDeployedJobSpecDescriptor(distributedId1)
 != null);
+        
Assert.assertTrue(cc.getDeployedJobSpecStore().getDeployedJobSpecDescriptor(distributedId2)
 != null);
+
+        //run the first job
+        JobId jobRunId1 = hcc.startJob(distributedId1, new HashMap<>());
+        hcc.waitForCompletion(jobRunId1);
+
+        //Make sure the job parameter map was removed
+        verify(cc, 
Mockito.timeout(TIME_THRESHOLD).times(1)).removeJobParameterByteStore(any());
+        verify(nc1, 
Mockito.timeout(TIME_THRESHOLD).times(1)).removeJobParameterByteStore(any());
+        verify(nc2, 
Mockito.timeout(TIME_THRESHOLD).times(1)).removeJobParameterByteStore(any());
+
+        //destroy the first job
+        hcc.undeployJobSpec(distributedId1);
+
+        //make sure it finished
+        verify(cc, 
Mockito.timeout(TIME_THRESHOLD).times(8)).getDeployedJobSpecStore();
+        verify(nc1, 
Mockito.timeout(TIME_THRESHOLD).times(1)).removeActivityClusterGraph(any());
+        verify(nc2, 
Mockito.timeout(TIME_THRESHOLD).times(1)).removeActivityClusterGraph(any());
+
+        //confirm the first job is destroyed
+        Assert.assertTrue(nc1.getActivityClusterGraph(distributedId1) == null 
&& nc2.getActivityClusterGraph(distributedId1) == null);
+        
cc.getDeployedJobSpecStore().checkForExistingDeployedJobSpecDescriptor(distributedId1);
+
+        //run the second job
+        JobId jobRunId2 = hcc.startJob(distributedId2, new HashMap<>());
+        hcc.waitForCompletion(jobRunId2);
+
+        //Make sure the job parameter map was removed
+        verify(cc, 
Mockito.timeout(TIME_THRESHOLD).times(2)).removeJobParameterByteStore(any());
+        verify(nc1, 
Mockito.timeout(TIME_THRESHOLD).times(2)).removeJobParameterByteStore(any());
+        verify(nc2, 
Mockito.timeout(TIME_THRESHOLD).times(2)).removeJobParameterByteStore(any());
+
+        //run the second job again
+        JobId jobRunId3 = hcc.startJob(distributedId2, new HashMap<>());
+        hcc.waitForCompletion(jobRunId3);
+
+        //Make sure the job parameter map was removed
+        verify(cc, 
Mockito.timeout(TIME_THRESHOLD).times(3)).removeJobParameterByteStore(any());
+        verify(nc1, 
Mockito.timeout(TIME_THRESHOLD).times(3)).removeJobParameterByteStore(any());
+        verify(nc2, 
Mockito.timeout(TIME_THRESHOLD).times(3)).removeJobParameterByteStore(any());
+
+        //destroy the second job
+        hcc.undeployJobSpec(distributedId2);
+
+        //make sure it finished
+        verify(cc, 
Mockito.timeout(TIME_THRESHOLD).times(12)).getDeployedJobSpecStore();
+        verify(nc1, 
Mockito.timeout(TIME_THRESHOLD).times(2)).removeActivityClusterGraph(any());
+        verify(nc2, 
Mockito.timeout(TIME_THRESHOLD).times(2)).removeActivityClusterGraph(any());
+
+        //confirm the second job is destroyed
+        Assert.assertTrue(nc1.getActivityClusterGraph(distributedId2) == null 
&& nc2.getActivityClusterGraph(distributedId2) == null);
+        
cc.getDeployedJobSpecStore().checkForExistingDeployedJobSpecDescriptor(distributedId2);
+
+        //run the second job 100 times in parallel
+        distributedId2 = hcc.deployJobSpec(spec2);
+        for (int i = 0; i < 100; i++) {
+            hcc.startJob(distributedId2, new HashMap<>());
+        }
+    }
+
+    @AfterClass
+    public static void deinit() throws Exception {
+        nc2.stop();
+        nc1.stop();
+        cc.stop();
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java
deleted file mode 100644
index caba5f6..0000000
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.tests.integration;
-
-import static org.apache.hyracks.util.file.FileUtil.joinPath;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.verify;
-
-import java.io.File;
-import java.lang.reflect.Field;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.hyracks.api.client.HyracksConnection;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.control.cc.ClusterControllerService;
-import org.apache.hyracks.control.cc.cluster.INodeManager;
-import org.apache.hyracks.control.cc.cluster.NodeManager;
-import org.apache.hyracks.control.common.controllers.CCConfig;
-import org.apache.hyracks.control.common.controllers.NCConfig;
-import org.apache.hyracks.control.nc.NodeControllerService;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-public class PredistributedJobsTest {
-    private static final Logger LOGGER = 
Logger.getLogger(PredistributedJobsTest.class.getName());
-
-    private static final String NC1_ID = "nc1";
-    private static final String NC2_ID = "nc2";
-
-    private static ClusterControllerService cc;
-    private static NodeControllerService nc1;
-    private static NodeControllerService nc2;
-    private static IHyracksClientConnection hcc;
-
-    @BeforeClass
-    public static void init() throws Exception {
-        CCConfig ccConfig = new CCConfig();
-        ccConfig.setClientListenAddress("127.0.0.1");
-        ccConfig.setClientListenPort(39000);
-        ccConfig.setClusterListenAddress("127.0.0.1");
-        ccConfig.setClusterListenPort(39001);
-        ccConfig.setProfileDumpPeriod(10000);
-        FileUtils.deleteQuietly(new File(joinPath("target", "data")));
-        FileUtils.copyDirectory(new File("data"), new File(joinPath("target", 
"data")));
-        File outDir = new File("target" + File.separator + 
"ClusterController");
-        outDir.mkdirs();
-        File ccRoot = 
File.createTempFile(AbstractIntegrationTest.class.getName(), ".data", outDir);
-        ccRoot.delete();
-        ccRoot.mkdir();
-        ccConfig.setRootDir(ccRoot.getAbsolutePath());
-        ClusterControllerService ccBase = new 
ClusterControllerService(ccConfig);
-        // The spying below is dangerous since it replaces the 
ClusterControllerService already referenced by many
-        // objects created in the constructor above
-        cc = Mockito.spy(ccBase);
-        cc.start();
-
-        // The following code partially fixes the problem created by the spying
-        INodeManager nodeManager = cc.getNodeManager();
-        Field ccsInNodeManager = NodeManager.class.getDeclaredField("ccs");
-        ccsInNodeManager.setAccessible(true);
-        ccsInNodeManager.set(nodeManager, cc);
-
-        NCConfig ncConfig1 = new NCConfig(NC1_ID);
-        ncConfig1.setClusterAddress("localhost");
-        ncConfig1.setClusterPort(39001);
-        ncConfig1.setClusterListenAddress("127.0.0.1");
-        ncConfig1.setDataListenAddress("127.0.0.1");
-        ncConfig1.setResultListenAddress("127.0.0.1");
-        ncConfig1.setResultSweepThreshold(5000);
-        ncConfig1.setIODevices(new String[] { 
joinPath(System.getProperty("user.dir"), "target", "data", "device0") });
-        NodeControllerService nc1Base = new NodeControllerService(ncConfig1);
-        nc1 = Mockito.spy(nc1Base);
-        nc1.start();
-
-        NCConfig ncConfig2 = new NCConfig(NC2_ID);
-        ncConfig2.setClusterAddress("localhost");
-        ncConfig2.setClusterPort(39001);
-        ncConfig2.setClusterListenAddress("127.0.0.1");
-        ncConfig2.setDataListenAddress("127.0.0.1");
-        ncConfig2.setResultListenAddress("127.0.0.1");
-        ncConfig2.setResultSweepThreshold(5000);
-        ncConfig2.setIODevices(new String[] { 
joinPath(System.getProperty("user.dir"), "target", "data", "device1") });
-        NodeControllerService nc2Base = new NodeControllerService(ncConfig2);
-        nc2 = Mockito.spy(nc2Base);
-        nc2.start();
-
-        hcc = new HyracksConnection(ccConfig.getClientListenAddress(), 
ccConfig.getClientListenPort());
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Starting CC in " + ccRoot.getAbsolutePath());
-        }
-    }
-
-    @Test
-    public void DistributedTest() throws Exception {
-        JobSpecification spec1 = UnionTest.createUnionJobSpec();
-        JobSpecification spec2 = HeapSortMergeTest.createSortMergeJobSpec();
-
-        //distribute both jobs
-        JobId jobId1 = hcc.distributeJob(spec1);
-        JobId jobId2 = hcc.distributeJob(spec2);
-
-        //make sure it finished
-        //cc will get the store once to check for duplicate insertion and once 
to insert per job
-        verify(cc, Mockito.timeout(5000).times(4)).getPreDistributedJobStore();
-        verify(nc1, 
Mockito.timeout(5000).times(2)).storeActivityClusterGraph(any(), any());
-        verify(nc2, 
Mockito.timeout(5000).times(2)).storeActivityClusterGraph(any(), any());
-        verify(nc1, 
Mockito.timeout(5000).times(2)).checkForDuplicateDistributedJob(any());
-        verify(nc2, 
Mockito.timeout(5000).times(2)).checkForDuplicateDistributedJob(any());
-
-        //confirm that both jobs are distributed
-        Assert.assertTrue(nc1.getActivityClusterGraph(jobId1) != null && 
nc2.getActivityClusterGraph(jobId1) != null);
-        Assert.assertTrue(nc1.getActivityClusterGraph(jobId2) != null && 
nc2.getActivityClusterGraph(jobId2) != null);
-        
Assert.assertTrue(cc.getPreDistributedJobStore().getDistributedJobDescriptor(jobId1)
 != null);
-        
Assert.assertTrue(cc.getPreDistributedJobStore().getDistributedJobDescriptor(jobId2)
 != null);
-
-        //run the first job
-        hcc.startJob(jobId1);
-        hcc.waitForCompletion(jobId1);
-
-        //destroy the first job
-        hcc.destroyJob(jobId1);
-
-        //make sure it finished
-        verify(cc, Mockito.timeout(5000).times(8)).getPreDistributedJobStore();
-        verify(nc1, 
Mockito.timeout(5000).times(1)).removeActivityClusterGraph(any());
-        verify(nc2, 
Mockito.timeout(5000).times(1)).removeActivityClusterGraph(any());
-
-        //confirm the first job is destroyed
-        Assert.assertTrue(nc1.getActivityClusterGraph(jobId1) == null && 
nc2.getActivityClusterGraph(jobId1) == null);
-        
cc.getPreDistributedJobStore().checkForExistingDistributedJobDescriptor(jobId1);
-
-        //run the second job
-        hcc.startJob(jobId2);
-        hcc.waitForCompletion(jobId2);
-
-        //wait ten seconds to ensure the result sweeper does not break the job
-        //The result sweeper runs every 5 seconds during the tests
-        Thread.sleep(10000);
-
-        //run the second job again
-        hcc.startJob(jobId2);
-        hcc.waitForCompletion(jobId2);
-
-        //destroy the second job
-        hcc.destroyJob(jobId2);
-
-        //make sure it finished
-        verify(cc, 
Mockito.timeout(5000).times(12)).getPreDistributedJobStore();
-        verify(nc1, 
Mockito.timeout(5000).times(2)).removeActivityClusterGraph(any());
-        verify(nc2, 
Mockito.timeout(5000).times(2)).removeActivityClusterGraph(any());
-
-        //confirm the second job is destroyed
-        Assert.assertTrue(nc1.getActivityClusterGraph(jobId2) == null && 
nc2.getActivityClusterGraph(jobId2) == null);
-        
cc.getPreDistributedJobStore().checkForExistingDistributedJobDescriptor(jobId2);
-    }
-
-    @AfterClass
-    public static void deinit() throws Exception {
-        nc2.stop();
-        nc1.stop();
-        cc.stop();
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java
 
b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java
index d3c34dd..e310385 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java
@@ -26,6 +26,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.api.job.IJobletEventListenerFactory;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.profiling.counters.ICounterContext;
 import org.apache.hyracks.api.resources.IDeallocatable;
@@ -56,10 +57,15 @@ public class TestJobletContext implements 
IHyracksJobletContext {
         return frameManger.allocateFrame(bytes);
     }
 
-    ByteBuffer reallocateFrame(ByteBuffer tobeDeallocate, int 
newFrameSizeInBytes, boolean copyOldData) throws HyracksDataException {
+    ByteBuffer reallocateFrame(ByteBuffer tobeDeallocate, int 
newFrameSizeInBytes, boolean copyOldData)
+            throws HyracksDataException {
         return frameManger.reallocateFrame(tobeDeallocate, 
newFrameSizeInBytes, copyOldData);
     }
 
+    public IJobletEventListenerFactory getJobletEventListenerFactory() {
+        return null;
+    }
+
     void deallocateFrames(int bytes) {
         frameManger.deallocateFrames(bytes);
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
 
b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
index 3d13cf9..ac52573 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
@@ -163,6 +163,10 @@ public class TestTaskContext implements 
IHyracksTaskContext {
     }
 
     @Override
+    public byte[] getJobParameter(byte[] name, int start, int length) {
+        return new byte[0];
+    }
+
     public Set<JobFlag> getJobFlags() {
         return EnumSet.noneOf(JobFlag.class);
     }

Reply via email to