[ASTERIXDB-1911][HYR,RT,CLUS] Fixes and Improvements for Deployed Jobs

Rename Predistributed Jobs to Deployed Jobs
Enable job executions to have a map of job parameters
Add an Asterix function to retrieve these parameters
which are can be assigned when the job is run, e.g. for Deployed jobs
Allow Deployed jobs to have new TxnIds and JobIds for each execution
Allow simultaneous execution of one Deployed Job

Change-Id: I8f493c1fa977d07dfe8a875f9ebe9515d01d1473
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2045
Tested-by: Jenkins <[email protected]>
Contrib: Jenkins <[email protected]>
Integration-Tests: Jenkins <[email protected]>
Reviewed-by: Xikui Wang <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/e6f426b8
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/e6f426b8
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/e6f426b8

Branch: refs/heads/master
Commit: e6f426b8019957dd15962926788edbe655218cb9
Parents: 592af65
Author: Steven Glenn Jacobs <[email protected]>
Authored: Tue Nov 14 13:56:19 2017 -0800
Committer: Steven Jacobs <[email protected]>
Committed: Tue Nov 14 17:02:17 2017 -0800

----------------------------------------------------------------------
 asterixdb/asterix-active/pom.xml                |   5 +
 .../asterix/active/DeployedJobService.java      | 108 ++++++++++
 .../asterix/app/translator/QueryTranslator.java |  10 +-
 .../app/bootstrap/TestNodeController.java       |   3 +
 .../common/api/IJobEventListenerFactory.java    |  30 +++
 .../statement/CreateFunctionStatement.java      |   6 +-
 .../lang/common/visitor/FormatPrintVisitor.java |   7 +-
 .../asterix/metadata/utils/DatasetUtil.java     |   3 +-
 .../asterix/om/functions/BuiltinFunctions.java  |   6 +
 .../AbstractUnaryStringStringEval.java          |   3 +-
 .../GetJobParameterByNameDescriptor.java        |  77 +++++++
 .../runtime/functions/FunctionCollection.java   |   8 +-
 .../job/listener/JobEventListenerFactory.java   |  30 ++-
 ...tiTransactionJobletEventListenerFactory.java |  20 +-
 .../LockThenSearchOperationCallbackFactory.java |   6 +-
 ...exInstantSearchOperationCallbackFactory.java |   6 +-
 ...dexModificationOperationCallbackFactory.java |   6 +-
 ...maryIndexSearchOperationCallbackFactory.java |   6 +-
 ...dexModificationOperationCallbackFactory.java |   6 +-
 ...dexModificationOperationCallbackFactory.java |   6 +-
 ...dexModificationOperationCallbackFactory.java |   6 +-
 .../UpsertOperationCallbackFactory.java         |   6 +-
 .../runtime/CommitRuntimeFactory.java           |   8 +-
 .../client/HyracksClientInterfaceFunctions.java |  52 +++--
 .../HyracksClientInterfaceRemoteProxy.java      |  21 +-
 .../hyracks/api/client/HyracksConnection.java   |  19 +-
 .../api/client/IHyracksClientConnection.java    |  22 +-
 .../api/client/IHyracksClientInterface.java     |   7 +-
 .../impl/ActivityClusterGraphBuilder.java       |   6 +-
 ...ionActivityClusterGraphGeneratorFactory.java |   7 +-
 .../api/context/IHyracksJobletContext.java      |   3 +
 .../api/context/IHyracksTaskContext.java        |   3 +
 .../hyracks/api/exceptions/ErrorCode.java       |   6 +-
 .../hyracks/api/job/ActivityClusterId.java      |  19 +-
 .../hyracks/api/job/DeployedJobSpecId.java      |  91 ++++++++
 .../api/job/DeployedJobSpecIdFactory.java       |  34 +++
 .../IActivityClusterGraphGeneratorFactory.java  |   2 +-
 .../api/job/IJobletEventListenerFactory.java    |   7 +-
 .../hyracks/api/job/JobParameterByteStore.java  |  64 ++++++
 .../hyracks/control/cc/ClientInterfaceIPCI.java |  36 ++--
 .../control/cc/ClusterControllerIPCI.java       |  61 +++---
 .../control/cc/ClusterControllerService.java    |  36 +++-
 .../control/cc/DeployedJobSpecStore.java        | 102 +++++++++
 .../control/cc/PreDistributedJobStore.java      | 104 ----------
 .../cc/dataset/DatasetDirectoryService.java     |  12 +-
 .../control/cc/executor/JobExecutor.java        |  16 +-
 .../hyracks/control/cc/job/JobManager.java      |   5 +-
 .../apache/hyracks/control/cc/job/JobRun.java   |  17 +-
 .../control/cc/work/DeployJobSpecWork.java      |  76 +++++++
 .../control/cc/work/DeployedJobFailureWork.java |  39 ++++
 .../hyracks/control/cc/work/DestroyJobWork.java |  52 -----
 .../control/cc/work/DistributeJobWork.java      |  80 -------
 .../cc/work/DistributedJobFailureWork.java      |  39 ----
 .../hyracks/control/cc/work/JobStartWork.java   |  26 ++-
 .../control/cc/work/UndeployJobSpecWork.java    |  53 +++++
 .../control/common/base/IClusterController.java |   3 +-
 .../control/common/base/INodeController.java    |   9 +-
 .../control/common/ipc/CCNCFunctions.java       | 100 +++++++--
 .../ipc/ClusterControllerRemoteProxy.java       |  26 ++-
 .../common/ipc/NodeControllerRemoteProxy.java   |  28 ++-
 .../org/apache/hyracks/control/nc/Joblet.java   |  16 +-
 .../hyracks/control/nc/NodeControllerIPCI.java  |  18 +-
 .../control/nc/NodeControllerService.java       |  50 +++--
 .../org/apache/hyracks/control/nc/Task.java     |   3 +
 .../control/nc/work/CleanupJobletWork.java      |   1 +
 .../control/nc/work/DeployJobSpecWork.java      |  62 ++++++
 .../hyracks/control/nc/work/DestroyJobWork.java |  54 -----
 .../control/nc/work/DistributeJobWork.java      |  63 ------
 .../hyracks/control/nc/work/StartTasksWork.java |  33 ++-
 .../control/nc/work/UndeployJobSpecWork.java    |  54 +++++
 .../tests/integration/DeployedJobSpecsTest.java | 206 +++++++++++++++++++
 .../integration/PredistributedJobsTest.java     | 186 -----------------
 .../hyracks/test/support/TestJobletContext.java |   8 +-
 .../hyracks/test/support/TestTaskContext.java   |   4 +
 74 files changed, 1554 insertions(+), 859 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/asterixdb/asterix-active/pom.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/pom.xml b/asterixdb/asterix-active/pom.xml
index 5865cd1..6b0c381 100644
--- a/asterixdb/asterix-active/pom.xml
+++ b/asterixdb/asterix-active/pom.xml
@@ -31,6 +31,11 @@
       <version>${project.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.asterix</groupId>
+      <artifactId>asterix-transactions</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.hyracks</groupId>
       <artifactId>hyracks-api</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/DeployedJobService.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/DeployedJobService.java
 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/DeployedJobService.java
new file mode 100644
index 0000000..070d838
--- /dev/null
+++ 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/DeployedJobService.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.active;
+
+import java.time.Instant;
+import java.util.Date;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import 
org.apache.asterix.transaction.management.service.transaction.TxnIdFactory;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
+import org.apache.hyracks.api.job.JobId;
+
+/**
+ * Provides functionality for running DeployedJobSpecs
+ */
+public class DeployedJobService {
+
+    private static final Logger LOGGER = 
Logger.getLogger(DeployedJobService.class.getName());
+
+    //To enable new Asterix TxnId for separate deployed job spec invocations
+    private static final byte[] TRANSACTION_ID_PARAMETER_NAME = 
"TxnIdParameter".getBytes();
+
+    //pool size one (only running one thread at a time)
+    private static final int POOL_SIZE = 1;
+
+    //Starts running a deployed job specification periodically with an 
interval of "duration" seconds
+    public static ScheduledExecutorService 
startRepetitiveDeployedJobSpec(DeployedJobSpecId distributedId,
+            IHyracksClientConnection hcc, long duration, Map<byte[], byte[]> 
jobParameters, EntityId entityId) {
+        ScheduledExecutorService scheduledExecutorService = 
Executors.newScheduledThreadPool(POOL_SIZE);
+        scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    if (!runRepetitiveDeployedJobSpec(distributedId, hcc, 
jobParameters, duration, entityId)) {
+                        scheduledExecutorService.shutdown();
+                    }
+                } catch (Exception e) {
+                    LOGGER.log(Level.SEVERE, "Job Failed to run for " + 
entityId.getExtensionName() + " "
+                            + entityId.getDataverse() + "." + 
entityId.getEntityName() + ".", e);
+                }
+            }
+        }, duration, duration, TimeUnit.MILLISECONDS);
+        return scheduledExecutorService;
+    }
+
+    public static boolean runRepetitiveDeployedJobSpec(DeployedJobSpecId 
distributedId, IHyracksClientConnection hcc,
+            Map<byte[], byte[]> jobParameters, long duration, EntityId 
entityId) throws Exception {
+        long executionMilliseconds = runDeployedJobSpec(distributedId, hcc, 
jobParameters, entityId);
+        if (executionMilliseconds > duration && 
LOGGER.isLoggable(Level.SEVERE)) {
+            LOGGER.log(Level.SEVERE,
+                    "Periodic job for " + entityId.getExtensionName() + " " + 
entityId.getDataverse() + "."
+                            + entityId.getEntityName() + " was unable to meet 
the required period of " + duration
+                            + " milliseconds. Actually took " + 
executionMilliseconds + " execution will shutdown"
+                            + new Date());
+            return false;
+        }
+        return true;
+    }
+
+    public synchronized static long runDeployedJobSpec(DeployedJobSpecId 
distributedId, IHyracksClientConnection hcc,
+            Map<byte[], byte[]> jobParameters, EntityId entityId) throws 
Exception {
+        JobId jobId;
+        long startTime = Instant.now().toEpochMilli();
+
+        //Add the Asterix Transaction Id to the map
+        jobParameters.put(TRANSACTION_ID_PARAMETER_NAME, 
String.valueOf(TxnIdFactory.create().getId()).getBytes());
+        jobId = hcc.startJob(distributedId, jobParameters);
+
+        hcc.waitForCompletion(jobId);
+        long executionMilliseconds = Instant.now().toEpochMilli() - startTime;
+
+        LOGGER.log(Level.INFO,
+                "Deployed Job execution completed for " + 
entityId.getExtensionName() + " " + entityId.getDataverse()
+                        + "."
+                        + entityId.getEntityName() + ". Took " + 
executionMilliseconds + " milliseconds ");
+
+        return executionMilliseconds;
+
+    }
+
+    @Override
+    public String toString() {
+        return "DeployedJobSpecService";
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index a811454..a52d765 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -744,7 +744,7 @@ public class QueryTranslator extends AbstractLangTranslator 
implements IStatemen
         return DatasetUtil.createNodeGroupForNewDataset(dataverseName, 
datasetName, selectedNodes, metadataProvider);
     }
 
-    protected void handleCreateIndexStatement(MetadataProvider 
metadataProvider, Statement stmt,
+    public void handleCreateIndexStatement(MetadataProvider metadataProvider, 
Statement stmt,
             IHyracksClientConnection hcc, IRequestParameters 
requestParameters) throws Exception {
         CreateIndexStatement stmtCreateIndex = (CreateIndexStatement) stmt;
         String dataverseName = 
getActiveDataverse(stmtCreateIndex.getDataverseName());
@@ -1672,9 +1672,9 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
 
     protected void handleCreateFunctionStatement(MetadataProvider 
metadataProvider, Statement stmt) throws Exception {
         CreateFunctionStatement cfs = (CreateFunctionStatement) stmt;
-        String dataverse = 
getActiveDataverseName(cfs.getSignature().getNamespace());
-        cfs.getSignature().setNamespace(dataverse);
-        String functionName = cfs.getaAterixFunction().getName();
+        String dataverse = 
getActiveDataverseName(cfs.getFunctionSignature().getNamespace());
+        cfs.getFunctionSignature().setNamespace(dataverse);
+        String functionName = cfs.getFunctionSignature().getName();
 
         MetadataTransactionContext mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -1685,7 +1685,7 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
             if (dv == null) {
                 throw new AlgebricksException("There is no dataverse with this 
name " + dataverse + ".");
             }
-            Function function = new Function(dataverse, functionName, 
cfs.getaAterixFunction().getArity(),
+            Function function = new Function(dataverse, functionName, 
cfs.getFunctionSignature().getArity(),
                     cfs.getParamList(), Function.RETURNTYPE_VOID, 
cfs.getFunctionBody(),
                     rewriterFactory instanceof SqlppRewriterFactory ? 
Function.LANGUAGE_SQLPP : Function.LANGUAGE_AQL,
                     FunctionKind.SCALAR.toString());

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index df8eef7..90b9a1e 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -58,6 +58,7 @@ import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.runtime.formats.FormatUtils;
 import org.apache.asterix.runtime.formats.NonTaggedDataFormat;
+import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
 import 
org.apache.asterix.runtime.operators.LSMPrimaryUpsertOperatorNodePushable;
 import org.apache.asterix.runtime.utils.CcApplicationContext;
 import org.apache.asterix.test.runtime.ExecutionTestUtil;
@@ -298,6 +299,8 @@ public class TestNodeController {
         }
         JobId jobId = newJobId();
         IHyracksJobletContext jobletCtx = 
Mockito.mock(IHyracksJobletContext.class);
+        JobEventListenerFactory factory = new JobEventListenerFactory(new 
TxnId(jobId.getId()), true);
+        
Mockito.when(jobletCtx.getJobletEventListenerFactory()).thenReturn(factory);
         
Mockito.when(jobletCtx.getServiceContext()).thenReturn(ExecutionTestUtil.integrationUtil.ncs[0].getContext());
         Mockito.when(jobletCtx.getJobId()).thenReturn(jobId);
         ctx = Mockito.spy(ctx);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IJobEventListenerFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IJobEventListenerFactory.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IJobEventListenerFactory.java
new file mode 100644
index 0000000..0f37b13
--- /dev/null
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IJobEventListenerFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.api;
+
+import org.apache.asterix.common.transactions.TxnId;
+import org.apache.hyracks.api.job.IJobletEventListenerFactory;
+
+/**
+ * an interface for JobEventListenerFactories to add Asterix transaction JobId 
getter
+ */
+public interface IJobEventListenerFactory extends IJobletEventListenerFactory {
+
+    TxnId getTxnId(TxnId compiledTxnId);
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFunctionStatement.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFunctionStatement.java
 
b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFunctionStatement.java
index 6d74957..84d66ce 100644
--- 
a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFunctionStatement.java
+++ 
b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFunctionStatement.java
@@ -34,10 +34,6 @@ public class CreateFunctionStatement implements Statement {
     private final boolean ifNotExists;
     private final List<String> paramList;
 
-    public FunctionSignature getaAterixFunction() {
-        return signature;
-    }
-
     public String getFunctionBody() {
         return functionBody;
     }
@@ -66,7 +62,7 @@ public class CreateFunctionStatement implements Statement {
         return paramList;
     }
 
-    public FunctionSignature getSignature() {
+    public FunctionSignature getFunctionSignature() {
         return signature;
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java
 
b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java
index a2e7341..c74ee5d 100644
--- 
a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java
+++ 
b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java
@@ -232,7 +232,7 @@ public class FormatPrintVisitor implements 
ILangVisitor<Void, Integer> {
             out.print("(");
             exprList.get(0).accept(this, step + 1);
             for (int i = 1; i < exprList.size(); i++) {
-                OperatorType opType = opList.get(i - 1);;
+                OperatorType opType = opList.get(i - 1);
                 if (i == 1) {
                     printHints(operatorExpr.getHints(), step + 1);
                 }
@@ -657,7 +657,7 @@ public class FormatPrintVisitor implements 
ILangVisitor<Void, Integer> {
         out.print(skip(step) + CREATE + " index ");
         out.print(normalize(cis.getIndexName().getValue()) + " ");
         out.print(generateIfNotExists(cis.getIfNotExists()));
-        out.print(" on ");;
+        out.print(" on ");
         out.print(generateFullName(cis.getDataverseName(), 
cis.getDatasetName()));
 
         out.print(" (");
@@ -827,7 +827,8 @@ public class FormatPrintVisitor implements 
ILangVisitor<Void, Integer> {
     public Void visit(CreateFunctionStatement cfs, Integer step) throws 
CompilationException {
         out.print(skip(step) + CREATE + " function ");
         out.print(generateIfNotExists(cfs.getIfNotExists()));
-        out.print(this.generateFullName(cfs.getSignature().getNamespace(), 
cfs.getSignature().getName()));
+        out.print(
+                
this.generateFullName(cfs.getFunctionSignature().getNamespace(), 
cfs.getFunctionSignature().getName()));
         out.print("(");
         printDelimitedStrings(cfs.getParamList(), COMMA);
         out.println(") {");

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
index 07d3c69..e4c4860 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
@@ -205,7 +205,6 @@ public class DatasetUtil {
         return new Pair<>(mergePolicyFactory, properties);
     }
 
-    @SuppressWarnings("unchecked")
     public static void writePropertyTypeRecord(String name, String value, 
DataOutput out, ARecordType recordType)
             throws HyracksDataException {
         IARecordBuilder propertyRecordBuilder = new RecordBuilder();
@@ -399,7 +398,7 @@ public class DatasetUtil {
                 metadataProvider.getSplitProviderAndConstraints(dataset);
 
         // prepare callback
-        TxnId txnId = ((JobEventListenerFactory) 
spec.getJobletEventListenerFactory()).getTxnId();
+        TxnId txnId = ((JobEventListenerFactory) 
spec.getJobletEventListenerFactory()).getTxnId(null);
         int[] primaryKeyFields = new int[numKeys];
         for (int i = 0; i < numKeys; i++) {
             primaryKeyFields[i] = i;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
 
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
index cdb0dc0..e59c600 100644
--- 
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
+++ 
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
@@ -864,6 +864,9 @@ public class BuiltinFunctions {
     public static final FunctionIdentifier EXTERNAL_LOOKUP = new 
FunctionIdentifier(FunctionConstants.ASTERIX_NS,
             "external-lookup", FunctionIdentifier.VARARGS);
 
+    public static final FunctionIdentifier GET_JOB_PARAMETER =
+            new FunctionIdentifier(FunctionConstants.ASTERIX_NS, 
"get-job-param", 1);
+
     public static final FunctionIdentifier META = new 
FunctionIdentifier(FunctionConstants.ASTERIX_NS, "meta",
             FunctionIdentifier.VARARGS);
     public static final FunctionIdentifier META_KEY = new 
FunctionIdentifier(FunctionConstants.ASTERIX_NS, "meta-key",
@@ -1274,6 +1277,9 @@ public class BuiltinFunctions {
         // external lookup
         addPrivateFunction(EXTERNAL_LOOKUP, AnyTypeComputer.INSTANCE, false);
 
+        // get job parameter
+        addFunction(GET_JOB_PARAMETER, AnyTypeComputer.INSTANCE, false);
+
         // unnesting function
         addPrivateFunction(SCAN_COLLECTION, 
CollectionMemberResultType.INSTANCE, true);
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractUnaryStringStringEval.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractUnaryStringStringEval.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractUnaryStringStringEval.java
index 1f9909c..95b6ef6 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractUnaryStringStringEval.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractUnaryStringStringEval.java
@@ -22,8 +22,8 @@ package org.apache.asterix.runtime.evaluators.functions;
 import java.io.DataOutput;
 import java.io.IOException;
 
-import org.apache.asterix.runtime.exceptions.TypeMismatchException;
 import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.runtime.exceptions.TypeMismatchException;
 import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
@@ -57,7 +57,6 @@ abstract class AbstractUnaryStringStringEval implements 
IScalarEvaluator {
         this.funcID = funcID;
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public void evaluate(IFrameTupleReference tuple, IPointable 
resultPointable) throws HyracksDataException {
         resultStorage.reset();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/GetJobParameterByNameDescriptor.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/GetJobParameterByNameDescriptor.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/GetJobParameterByNameDescriptor.java
new file mode 100644
index 0000000..17f7a96
--- /dev/null
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/GetJobParameterByNameDescriptor.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.runtime.evaluators.functions;
+
+import java.io.IOException;
+
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.functions.IFunctionDescriptor;
+import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
+import 
org.apache.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
+
+public class GetJobParameterByNameDescriptor extends 
AbstractScalarFunctionDynamicDescriptor {
+    private static final long serialVersionUID = 1L;
+    public static final IFunctionDescriptorFactory FACTORY = new 
IFunctionDescriptorFactory() {
+        @Override
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new GetJobParameterByNameDescriptor();
+        }
+    };
+
+    @Override
+    public IScalarEvaluatorFactory 
createEvaluatorFactory(IScalarEvaluatorFactory[] args) {
+        return new IScalarEvaluatorFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public IScalarEvaluator createScalarEvaluator(IHyracksTaskContext 
ctx) throws HyracksDataException {
+                return new AbstractUnaryStringStringEval(ctx, args[0],
+                        GetJobParameterByNameDescriptor.this.getIdentifier()) {
+                    private byte[] result;
+
+                    @Override
+                    protected void process(UTF8StringPointable inputString, 
IPointable resultPointable)
+                            throws IOException {
+                        result = 
ctx.getJobParameter(inputString.getByteArray(), inputString.getStartOffset(),
+                                inputString.getLength());
+                    }
+
+                    @Override
+                    void writeResult(IPointable resultPointable) throws 
IOException {
+                        resultPointable.set(result, 0, result.length);
+                    }
+                };
+            }
+        };
+    }
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return BuiltinFunctions.GET_JOB_PARAMETER;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java
index 5acbeb9..f3eb6c9 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java
@@ -158,6 +158,7 @@ import 
org.apache.asterix.runtime.evaluators.functions.EditDistanceStringIsFilte
 import 
org.apache.asterix.runtime.evaluators.functions.FullTextContainsDescriptor;
 import 
org.apache.asterix.runtime.evaluators.functions.FullTextContainsWithoutOptionDescriptor;
 import org.apache.asterix.runtime.evaluators.functions.GetItemDescriptor;
+import 
org.apache.asterix.runtime.evaluators.functions.GetJobParameterByNameDescriptor;
 import org.apache.asterix.runtime.evaluators.functions.GramTokensDescriptor;
 import 
org.apache.asterix.runtime.evaluators.functions.HashedGramTokensDescriptor;
 import 
org.apache.asterix.runtime.evaluators.functions.HashedWordTokensDescriptor;
@@ -437,6 +438,9 @@ public final class FunctionCollection {
         // Inject failure function
         fc.add(InjectFailureDescriptor.FACTORY);
 
+        // Get Job Parameter function
+        fc.add(GetJobParameterByNameDescriptor.FACTORY);
+
         // Switch case
         fc.add(SwitchCaseDescriptor.FACTORY);
 
@@ -737,8 +741,8 @@ public final class FunctionCollection {
      */
     private static IFunctionDescriptorFactory 
getGeneratedFunctionDescriptorFactory(Class<?> cl) {
         try {
-            String className = 
CodeGenHelper.getGeneratedClassName(cl.getName(),
-                    CodeGenHelper.DEFAULT_SUFFIX_FOR_GENERATED_CLASS);
+            String className =
+                    CodeGenHelper.getGeneratedClassName(cl.getName(), 
CodeGenHelper.DEFAULT_SUFFIX_FOR_GENERATED_CLASS);
             Class<?> generatedCl = cl.getClassLoader().loadClass(className);
             Field factory = generatedCl.getDeclaredField(FACTORY);
             return (IFunctionDescriptorFactory) factory.get(null);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java
index 1422e42..23ad1e1 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.runtime.job.listener;
 
+import org.apache.asterix.common.api.IJobEventListenerFactory;
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.transactions.DatasetId;
@@ -27,14 +28,19 @@ import org.apache.asterix.common.transactions.TxnId;
 import org.apache.hyracks.api.context.IHyracksJobletContext;
 import org.apache.hyracks.api.job.IJobletEventListener;
 import org.apache.hyracks.api.job.IJobletEventListenerFactory;
+import org.apache.hyracks.api.job.JobParameterByteStore;
 import org.apache.hyracks.api.job.JobStatus;
 
-public class JobEventListenerFactory implements IJobletEventListenerFactory {
+public class JobEventListenerFactory implements IJobEventListenerFactory {
 
     private static final long serialVersionUID = 1L;
-    private final TxnId txnId;
+
+    private TxnId txnId;
     private final boolean transactionalWrite;
 
+    //To enable new Asterix TxnId for separate deployed job spec invocations
+    private static final byte[] TRANSACTION_ID_PARAMETER_NAME = 
"TxnIdParameter".getBytes();
+
     public JobEventListenerFactory(TxnId txnId, boolean transactionalWrite) {
         this.txnId = txnId;
         this.transactionalWrite = transactionalWrite;
@@ -45,6 +51,26 @@ public class JobEventListenerFactory implements 
IJobletEventListenerFactory {
     }
 
     @Override
+    public TxnId getTxnId(TxnId compiledTxnId) {
+        return txnId;
+    }
+
+    @Override
+    public IJobletEventListenerFactory copyFactory() {
+        return new JobEventListenerFactory(txnId, transactionalWrite);
+    }
+
+    @Override
+    public void updateListenerJobParameters(JobParameterByteStore 
jobParameterByteStore) {
+        String AsterixTransactionIdString =
+                new 
String(jobParameterByteStore.getParameterValue(TRANSACTION_ID_PARAMETER_NAME, 0,
+                        TRANSACTION_ID_PARAMETER_NAME.length));
+        if (AsterixTransactionIdString.length() > 0) {
+            this.txnId = new 
TxnId(Integer.parseInt(AsterixTransactionIdString));
+        }
+    }
+
+    @Override
     public IJobletEventListener createListener(final IHyracksJobletContext 
jobletContext) {
 
         return new IJobletEventListener() {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java
index a63f3ca..23c86f3 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java
@@ -20,6 +20,7 @@ package org.apache.asterix.runtime.job.listener;
 
 import java.util.List;
 
+import org.apache.asterix.common.api.IJobEventListenerFactory;
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.transactions.DatasetId;
@@ -29,13 +30,14 @@ import org.apache.asterix.common.transactions.TxnId;
 import org.apache.hyracks.api.context.IHyracksJobletContext;
 import org.apache.hyracks.api.job.IJobletEventListener;
 import org.apache.hyracks.api.job.IJobletEventListenerFactory;
+import org.apache.hyracks.api.job.JobParameterByteStore;
 import org.apache.hyracks.api.job.JobStatus;
 
 /**
  * This Joblet enable transactions on multiple datasets to take place in the 
same Hyracks Job
  * It takes a list of Transaction job ids instead of a single job Id
  */
-public class MultiTransactionJobletEventListenerFactory implements 
IJobletEventListenerFactory {
+public class MultiTransactionJobletEventListenerFactory implements 
IJobEventListenerFactory {
 
     private static final long serialVersionUID = 1L;
     private final List<TxnId> txnIds;
@@ -46,6 +48,22 @@ public class MultiTransactionJobletEventListenerFactory 
implements IJobletEventL
         this.transactionalWrite = transactionalWrite;
     }
 
+    //TODO: Enable this factory to be usable for Deployed Jobs
+    @Override
+    public TxnId getTxnId(TxnId compiledTxnId) {
+        return compiledTxnId;
+    }
+
+    @Override
+    public IJobletEventListenerFactory copyFactory() {
+        return new MultiTransactionJobletEventListenerFactory(txnIds, 
transactionalWrite);
+    }
+
+    @Override
+    public void updateListenerJobParameters(JobParameterByteStore 
jobParameterByteStore) {
+        //no op
+    }
+
     @Override
     public IJobletEventListener createListener(final IHyracksJobletContext 
jobletContext) {
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java
index 73b9b41..3f3dbd9 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.transaction.management.opcallbacks;
 
+import org.apache.asterix.common.api.IJobEventListenerFactory;
 import org.apache.asterix.common.context.ITransactionSubsystemProvider;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.transactions.AbstractOperationCallbackFactory;
@@ -28,6 +29,7 @@ import org.apache.asterix.common.transactions.TxnId;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IJobletEventListenerFactory;
 import 
org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
 
 public class LockThenSearchOperationCallbackFactory extends 
AbstractOperationCallbackFactory
@@ -45,7 +47,9 @@ public class LockThenSearchOperationCallbackFactory extends 
AbstractOperationCal
             IOperatorNodePushable operatorNodePushable) throws 
HyracksDataException {
         ITransactionSubsystem txnSubsystem = 
txnSubsystemProvider.getTransactionSubsystem(ctx);
         try {
-            ITransactionContext txnCtx = 
txnSubsystem.getTransactionManager().getTransactionContext(txnId, false);
+            IJobletEventListenerFactory fact = 
ctx.getJobletContext().getJobletEventListenerFactory();
+            ITransactionContext txnCtx = txnSubsystem.getTransactionManager()
+                    .getTransactionContext(((IJobEventListenerFactory) 
fact).getTxnId(txnId), false);
             return new LockThenSearchOperationCallback(new 
DatasetId(datasetId), primaryKeyFields, txnSubsystem, txnCtx,
                     operatorNodePushable);
         } catch (ACIDException e) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java
index dbf58e4..93108f9 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java
@@ -19,6 +19,7 @@
 
 package org.apache.asterix.transaction.management.opcallbacks;
 
+import org.apache.asterix.common.api.IJobEventListenerFactory;
 import org.apache.asterix.common.context.ITransactionSubsystemProvider;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.transactions.AbstractOperationCallbackFactory;
@@ -29,6 +30,7 @@ import org.apache.asterix.common.transactions.TxnId;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IJobletEventListenerFactory;
 import 
org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
 import org.apache.hyracks.storage.common.ISearchOperationCallback;
 
@@ -47,7 +49,9 @@ public class 
PrimaryIndexInstantSearchOperationCallbackFactory extends AbstractO
             throws HyracksDataException {
         ITransactionSubsystem txnSubsystem = 
txnSubsystemProvider.getTransactionSubsystem(ctx);
         try {
-            ITransactionContext txnCtx = 
txnSubsystem.getTransactionManager().getTransactionContext(txnId, false);
+            IJobletEventListenerFactory fact = 
ctx.getJobletContext().getJobletEventListenerFactory();
+            ITransactionContext txnCtx = txnSubsystem.getTransactionManager()
+                    .getTransactionContext(((IJobEventListenerFactory) 
fact).getTxnId(txnId), false);
             return new PrimaryIndexInstantSearchOperationCallback(new 
DatasetId(datasetId), primaryKeyFields,
                     txnSubsystem.getLockManager(), txnCtx);
         } catch (ACIDException e) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
index b8c6084..fb01952 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
@@ -19,6 +19,7 @@
 
 package org.apache.asterix.transaction.management.opcallbacks;
 
+import org.apache.asterix.common.api.IJobEventListenerFactory;
 import org.apache.asterix.common.context.ITransactionSubsystemProvider;
 import org.apache.asterix.common.dataflow.DatasetLocalResource;
 import org.apache.asterix.common.exceptions.ACIDException;
@@ -32,6 +33,7 @@ import 
org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModifi
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IJobletEventListenerFactory;
 import 
org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.common.IIndex;
@@ -66,7 +68,9 @@ public class PrimaryIndexModificationOperationCallbackFactory 
extends AbstractOp
         }
 
         try {
-            ITransactionContext txnCtx = 
txnSubsystem.getTransactionManager().getTransactionContext(txnId, false);
+            IJobletEventListenerFactory fact = 
ctx.getJobletContext().getJobletEventListenerFactory();
+            ITransactionContext txnCtx = txnSubsystem.getTransactionManager()
+                    .getTransactionContext(((IJobEventListenerFactory) 
fact).getTxnId(txnId), false);
             DatasetLocalResource aResource = (DatasetLocalResource) 
resource.getResource();
             IModificationOperationCallback modCallback = new 
PrimaryIndexModificationOperationCallback(
                     new DatasetId(datasetId), primaryKeyFields, txnCtx, 
txnSubsystem.getLockManager(), txnSubsystem,

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java
index 49490a1..076b0d9 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java
@@ -19,6 +19,7 @@
 
 package org.apache.asterix.transaction.management.opcallbacks;
 
+import org.apache.asterix.common.api.IJobEventListenerFactory;
 import org.apache.asterix.common.context.ITransactionSubsystemProvider;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.transactions.AbstractOperationCallbackFactory;
@@ -29,6 +30,7 @@ import org.apache.asterix.common.transactions.TxnId;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IJobletEventListenerFactory;
 import 
org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
 import org.apache.hyracks.storage.common.ISearchOperationCallback;
 
@@ -47,7 +49,9 @@ public class PrimaryIndexSearchOperationCallbackFactory 
extends AbstractOperatio
             IOperatorNodePushable operatorNodePushable) throws 
HyracksDataException {
         ITransactionSubsystem txnSubsystem = 
txnSubsystemProvider.getTransactionSubsystem(ctx);
         try {
-            ITransactionContext txnCtx = 
txnSubsystem.getTransactionManager().getTransactionContext(txnId, false);
+            IJobletEventListenerFactory fact = 
ctx.getJobletContext().getJobletEventListenerFactory();
+            ITransactionContext txnCtx = txnSubsystem.getTransactionManager()
+                    .getTransactionContext(((IJobEventListenerFactory) 
fact).getTxnId(txnId), false);
             return new PrimaryIndexSearchOperationCallback(new 
DatasetId(datasetId), primaryKeyFields,
                     txnSubsystem.getLockManager(), txnCtx);
         } catch (ACIDException e) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
index 1847e80..5882046 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
@@ -19,6 +19,7 @@
 
 package org.apache.asterix.transaction.management.opcallbacks;
 
+import org.apache.asterix.common.api.IJobEventListenerFactory;
 import org.apache.asterix.common.context.ITransactionSubsystemProvider;
 import org.apache.asterix.common.dataflow.DatasetLocalResource;
 import org.apache.asterix.common.exceptions.ACIDException;
@@ -32,6 +33,7 @@ import 
org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModifi
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IJobletEventListenerFactory;
 import 
org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.common.IModificationOperationCallback;
@@ -62,7 +64,9 @@ public class 
SecondaryIndexModificationOperationCallbackFactory extends Abstract
         }
 
         try {
-            ITransactionContext txnCtx = 
txnSubsystem.getTransactionManager().getTransactionContext(txnId, false);
+            IJobletEventListenerFactory fact = 
ctx.getJobletContext().getJobletEventListenerFactory();
+            ITransactionContext txnCtx = txnSubsystem.getTransactionManager()
+                    .getTransactionContext(((IJobEventListenerFactory) 
fact).getTxnId(txnId), false);
             DatasetLocalResource aResource = (DatasetLocalResource) 
resource.getResource();
             IModificationOperationCallback modCallback = new 
SecondaryIndexModificationOperationCallback(
                     new DatasetId(datasetId), primaryKeyFields, txnCtx, 
txnSubsystem.getLockManager(), txnSubsystem,

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
index 38adf2b..79ce788 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
@@ -19,6 +19,7 @@
 
 package org.apache.asterix.transaction.management.opcallbacks;
 
+import org.apache.asterix.common.api.IJobEventListenerFactory;
 import org.apache.asterix.common.context.ITransactionSubsystemProvider;
 import org.apache.asterix.common.dataflow.DatasetLocalResource;
 import org.apache.asterix.common.exceptions.ACIDException;
@@ -32,6 +33,7 @@ import 
org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModifi
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IJobletEventListenerFactory;
 import 
org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.common.IModificationOperationCallback;
@@ -63,7 +65,9 @@ public class 
TempDatasetPrimaryIndexModificationOperationCallbackFactory extends
         }
 
         try {
-            ITransactionContext txnCtx = 
txnSubsystem.getTransactionManager().getTransactionContext(txnId, false);
+            IJobletEventListenerFactory fact = 
ctx.getJobletContext().getJobletEventListenerFactory();
+            ITransactionContext txnCtx = txnSubsystem.getTransactionManager()
+                    .getTransactionContext(((IJobEventListenerFactory) 
fact).getTxnId(txnId), false);
             DatasetLocalResource aResource = (DatasetLocalResource) 
resource.getResource();
             IModificationOperationCallback modCallback = new 
TempDatasetIndexModificationOperationCallback(
                     new DatasetId(datasetId), primaryKeyFields, txnCtx, 
txnSubsystem.getLockManager(), txnSubsystem,

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
index 5e499fc..8a27914 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
@@ -19,6 +19,7 @@
 
 package org.apache.asterix.transaction.management.opcallbacks;
 
+import org.apache.asterix.common.api.IJobEventListenerFactory;
 import org.apache.asterix.common.context.ITransactionSubsystemProvider;
 import org.apache.asterix.common.dataflow.DatasetLocalResource;
 import org.apache.asterix.common.exceptions.ACIDException;
@@ -32,6 +33,7 @@ import 
org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModifi
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IJobletEventListenerFactory;
 import 
org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.common.IIndex;
@@ -65,7 +67,9 @@ public class 
TempDatasetSecondaryIndexModificationOperationCallbackFactory exten
         }
 
         try {
-            ITransactionContext txnCtx = 
txnSubsystem.getTransactionManager().getTransactionContext(txnId, false);
+            IJobletEventListenerFactory fact = 
ctx.getJobletContext().getJobletEventListenerFactory();
+            ITransactionContext txnCtx = txnSubsystem.getTransactionManager()
+                    .getTransactionContext(((IJobEventListenerFactory) 
fact).getTxnId(txnId), false);
             IModificationOperationCallback modCallback = new 
TempDatasetIndexModificationOperationCallback(
                     new DatasetId(datasetId), primaryKeyFields, txnCtx, 
txnSubsystem.getLockManager(), txnSubsystem,
                     resource.getId(), aResource.getPartition(), resourceType, 
indexOp);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
index be75ab9..dfd3eb1 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.transaction.management.opcallbacks;
 
+import org.apache.asterix.common.api.IJobEventListenerFactory;
 import org.apache.asterix.common.context.ITransactionSubsystemProvider;
 import org.apache.asterix.common.dataflow.DatasetLocalResource;
 import org.apache.asterix.common.exceptions.ACIDException;
@@ -31,6 +32,7 @@ import 
org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModifi
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IJobletEventListenerFactory;
 import 
org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.common.IModificationOperationCallback;
@@ -62,7 +64,9 @@ public class UpsertOperationCallbackFactory extends 
AbstractOperationCallbackFac
         }
 
         try {
-            ITransactionContext txnCtx = 
txnSubsystem.getTransactionManager().getTransactionContext(txnId, false);
+            IJobletEventListenerFactory fact = 
ctx.getJobletContext().getJobletEventListenerFactory();
+            ITransactionContext txnCtx = txnSubsystem.getTransactionManager()
+                    .getTransactionContext(((IJobEventListenerFactory) 
fact).getTxnId(txnId), false);
             IModificationOperationCallback modCallback = new 
UpsertOperationCallback(new DatasetId(datasetId),
                     primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), 
txnSubsystem, resource.getId(),
                     aResource.getPartition(), resourceType, indexOp);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
index 2e43957..58f7e69 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
@@ -19,11 +19,13 @@
 
 package org.apache.asterix.transaction.management.runtime;
 
+import org.apache.asterix.common.api.IJobEventListenerFactory;
 import org.apache.asterix.common.transactions.TxnId;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IJobletEventListenerFactory;
 
 public class CommitRuntimeFactory implements IPushRuntimeFactory {
 
@@ -55,7 +57,9 @@ public class CommitRuntimeFactory implements 
IPushRuntimeFactory {
 
     @Override
     public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws 
HyracksDataException {
-            return new CommitRuntime(ctx, txnId, datasetId, primaryKeyFields, 
isTemporaryDatasetWriteJob,
-                    isWriteTransaction, 
datasetPartitions[ctx.getTaskAttemptId().getTaskId().getPartition()], isSink);
+        IJobletEventListenerFactory fact = 
ctx.getJobletContext().getJobletEventListenerFactory();
+        return new CommitRuntime(ctx, ((IJobEventListenerFactory) 
fact).getTxnId(txnId), datasetId,
+                primaryKeyFields, isTemporaryDatasetWriteJob, 
isWriteTransaction,
+                
datasetPartitions[ctx.getTaskAttemptId().getTaskId().getPartition()], isSink);
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
index 95479c1..23c41fe 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
@@ -22,10 +22,13 @@ import java.io.Serializable;
 import java.net.URL;
 import java.util.EnumSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 import org.apache.hyracks.api.dataset.DatasetDirectoryRecord;
 import org.apache.hyracks.api.dataset.ResultSetId;
 import org.apache.hyracks.api.deployment.DeploymentId;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
 import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.JobId;
 
@@ -104,12 +107,12 @@ public class HyracksClientInterfaceFunctions {
         }
     }
 
-    public static class DistributeJobFunction extends Function {
+    public static class DeployJobSpecFunction extends Function {
         private static final long serialVersionUID = 1L;
 
         private final byte[] acggfBytes;
 
-        public DistributeJobFunction(byte[] acggfBytes) {
+        public DeployJobSpecFunction(byte[] acggfBytes) {
             this.acggfBytes = acggfBytes;
         }
 
@@ -145,13 +148,13 @@ public class HyracksClientInterfaceFunctions {
         }
     }
 
-    public static class DestroyJobFunction extends Function {
+    public static class UndeployJobSpecFunction extends Function {
         private static final long serialVersionUID = 1L;
 
-        private final JobId jobId;
+        private final DeployedJobSpecId deployedJobSpecId;
 
-        public DestroyJobFunction(JobId jobId) {
-            this.jobId = jobId;
+        public UndeployJobSpecFunction(DeployedJobSpecId deployedJobSpecId) {
+            this.deployedJobSpecId = deployedJobSpecId;
         }
 
         @Override
@@ -159,8 +162,8 @@ public class HyracksClientInterfaceFunctions {
             return FunctionId.DESTROY_JOB;
         }
 
-        public JobId getJobId() {
-            return jobId;
+        public DeployedJobSpecId getDeployedJobSpecId() {
+            return deployedJobSpecId;
         }
     }
 
@@ -168,27 +171,30 @@ public class HyracksClientInterfaceFunctions {
         private static final long serialVersionUID = 1L;
 
         private final byte[] acggfBytes;
-        private final EnumSet<JobFlag> jobFlags;
+        private final Set<JobFlag> jobFlags;
         private final DeploymentId deploymentId;
-        private final JobId jobId;
+        private final DeployedJobSpecId deployedJobSpecId;
+        private final Map<byte[], byte[]> jobParameters;
 
-        public StartJobFunction(DeploymentId deploymentId, byte[] acggfBytes, 
EnumSet<JobFlag> jobFlags, JobId jobId) {
+        public StartJobFunction(DeploymentId deploymentId, byte[] acggfBytes, 
Set<JobFlag> jobFlags,
+                DeployedJobSpecId deployedJobSpecId, Map<byte[], byte[]> 
jobParameters) {
             this.acggfBytes = acggfBytes;
             this.jobFlags = jobFlags;
             this.deploymentId = deploymentId;
-            this.jobId = jobId;
+            this.deployedJobSpecId = deployedJobSpecId;
+            this.jobParameters = jobParameters;
         }
 
-        public StartJobFunction(JobId jobId) {
-            this(null, null, EnumSet.noneOf(JobFlag.class), jobId);
+        public StartJobFunction(DeployedJobSpecId deployedJobSpecId, 
Map<byte[], byte[]> jobParameters) {
+            this(null, null, EnumSet.noneOf(JobFlag.class), deployedJobSpecId, 
jobParameters);
         }
 
-        public StartJobFunction(byte[] acggfBytes, EnumSet<JobFlag> jobFlags) {
-            this(null, acggfBytes, jobFlags, null);
+        public StartJobFunction(byte[] acggfBytes, Set<JobFlag> jobFlags) {
+            this(null, acggfBytes, jobFlags, null, null);
         }
 
-        public StartJobFunction(DeploymentId deploymentId, byte[] acggfBytes, 
EnumSet<JobFlag> jobFlags) {
-            this(deploymentId, acggfBytes, jobFlags, null);
+        public StartJobFunction(DeploymentId deploymentId, byte[] acggfBytes, 
Set<JobFlag> jobFlags) {
+            this(deploymentId, acggfBytes, jobFlags, null, null);
         }
 
         @Override
@@ -196,15 +202,19 @@ public class HyracksClientInterfaceFunctions {
             return FunctionId.START_JOB;
         }
 
-        public JobId getJobId() {
-            return jobId;
+        public Map<byte[], byte[]> getJobParameters() {
+            return jobParameters;
+        }
+
+        public DeployedJobSpecId getDeployedJobSpecId() {
+            return deployedJobSpecId;
         }
 
         public byte[] getACGGFBytes() {
             return acggfBytes;
         }
 
-        public EnumSet<JobFlag> getJobFlags() {
+        public Set<JobFlag> getJobFlags() {
             return jobFlags;
         }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
index 0ded84f..eddcaa5 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
@@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.deployment.DeploymentId;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
 import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobInfo;
@@ -76,9 +77,9 @@ public class HyracksClientInterfaceRemoteProxy implements 
IHyracksClientInterfac
     }
 
     @Override
-    public JobId startJob(JobId jobId) throws Exception {
+    public JobId startJob(DeployedJobSpecId deployedJobSpecId, Map<byte[], 
byte[]> jobParameters) throws Exception {
         HyracksClientInterfaceFunctions.StartJobFunction sjf =
-                new HyracksClientInterfaceFunctions.StartJobFunction(jobId);
+                new 
HyracksClientInterfaceFunctions.StartJobFunction(deployedJobSpecId, 
jobParameters);
         return (JobId) rpci.call(ipcHandle, sjf);
     }
 
@@ -90,17 +91,17 @@ public class HyracksClientInterfaceRemoteProxy implements 
IHyracksClientInterfac
     }
 
     @Override
-    public JobId distributeJob(byte[] acggfBytes) throws Exception {
-        HyracksClientInterfaceFunctions.DistributeJobFunction sjf =
-                new 
HyracksClientInterfaceFunctions.DistributeJobFunction(acggfBytes);
-        return (JobId) rpci.call(ipcHandle, sjf);
+    public DeployedJobSpecId deployJobSpec(byte[] acggfBytes) throws Exception 
{
+        HyracksClientInterfaceFunctions.DeployJobSpecFunction sjf =
+                new 
HyracksClientInterfaceFunctions.DeployJobSpecFunction(acggfBytes);
+        return (DeployedJobSpecId) rpci.call(ipcHandle, sjf);
     }
 
     @Override
-    public JobId destroyJob(JobId jobId) throws Exception {
-        HyracksClientInterfaceFunctions.DestroyJobFunction sjf =
-                new HyracksClientInterfaceFunctions.DestroyJobFunction(jobId);
-        return (JobId) rpci.call(ipcHandle, sjf);
+    public DeployedJobSpecId undeployJobSpec(DeployedJobSpecId 
deployedJobSpecId) throws Exception {
+        HyracksClientInterfaceFunctions.UndeployJobSpecFunction sjf =
+                new 
HyracksClientInterfaceFunctions.UndeployJobSpecFunction(deployedJobSpecId);
+        return (DeployedJobSpecId) rpci.call(ipcHandle, sjf);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
index e979da6..85ef927 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
@@ -36,6 +36,7 @@ import 
org.apache.hyracks.api.client.impl.JobSpecificationActivityClusterGraphGe
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.deployment.DeploymentId;
 import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
 import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
 import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.JobId;
@@ -109,20 +110,20 @@ public final class HyracksConnection implements 
IHyracksClientConnection {
     }
 
     @Override
-    public JobId distributeJob(JobSpecification jobSpec) throws Exception {
-        IActivityClusterGraphGeneratorFactory jsacggf =
+    public DeployedJobSpecId deployJobSpec(JobSpecification jobSpec) throws 
Exception {
+        JobSpecificationActivityClusterGraphGeneratorFactory jsacggf =
                 new 
JobSpecificationActivityClusterGraphGeneratorFactory(jobSpec);
-        return distributeJob(jsacggf);
+        return deployJobSpec(jsacggf);
     }
 
     @Override
-    public JobId destroyJob(JobId jobId) throws Exception {
-        return hci.destroyJob(jobId);
+    public DeployedJobSpecId undeployJobSpec(DeployedJobSpecId 
deployedJobSpecId) throws Exception {
+        return hci.undeployJobSpec(deployedJobSpecId);
     }
 
     @Override
-    public JobId startJob(JobId jobId) throws Exception {
-        return hci.startJob(jobId);
+    public JobId startJob(DeployedJobSpecId deployedJobSpecId, Map<byte[], 
byte[]> jobParameters) throws Exception {
+        return hci.startJob(deployedJobSpecId, jobParameters);
     }
 
     @Override
@@ -130,8 +131,8 @@ public final class HyracksConnection implements 
IHyracksClientConnection {
         return hci.startJob(JavaSerializationUtils.serialize(acggf), jobFlags);
     }
 
-    public JobId distributeJob(IActivityClusterGraphGeneratorFactory acggf) 
throws Exception {
-        return hci.distributeJob(JavaSerializationUtils.serialize(acggf));
+    public DeployedJobSpecId 
deployJobSpec(IActivityClusterGraphGeneratorFactory acggf) throws Exception {
+        return hci.deployJobSpec(JavaSerializationUtils.serialize(acggf));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
index a7c1d75..510a6b6 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
@@ -20,9 +20,11 @@ package org.apache.hyracks.api.client;
 
 import java.util.EnumSet;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.deployment.DeploymentId;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
 import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
 import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.JobId;
@@ -94,25 +96,27 @@ public interface IHyracksClientConnection extends 
IClusterInfoCollector {
      *            Flags
      * @throws Exception
      */
-    JobId distributeJob(JobSpecification jobSpec) throws Exception;
+    DeployedJobSpecId deployJobSpec(JobSpecification jobSpec) throws Exception;
 
     /**
-     * Destroy the distributed graph for a pre-distributed job
+     * Remove the deployed Job Spec
      *
-     * @param jobId
-     *            The id of the predistributed job
+     * @param deployedJobSpecId
+     *            The id of the deployed job spec
      * @throws Exception
      */
-    JobId destroyJob(JobId jobId) throws Exception;
+    DeployedJobSpecId undeployJobSpec(DeployedJobSpecId deployedJobSpecId) 
throws Exception;
 
     /**
-     * Used to run a pre-distributed job by id (the same JobId will be 
returned)
+     * Used to run a deployed Job Spec by id
      *
-     * @param jobId
-     *            The id of the predistributed job
+     * @param deployedJobSpecId
+     *            The id of the deployed job spec
+     * @param jobParameters
+     *            The serialized job parameters
      * @throws Exception
      */
-    JobId startJob(JobId jobId) throws Exception;
+    JobId startJob(DeployedJobSpecId deployedJobSpecId, Map<byte[], byte[]> 
jobParameters) throws Exception;
 
     /**
      * Start the specified Job.

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
index 9cebd3e..f0c7872 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
@@ -25,6 +25,7 @@ import java.util.Map;
 
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.deployment.DeploymentId;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
 import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobInfo;
@@ -38,13 +39,13 @@ public interface IHyracksClientInterface {
 
     public JobId startJob(byte[] acggfBytes, EnumSet<JobFlag> jobFlags) throws 
Exception;
 
-    public JobId startJob(JobId jobId) throws Exception;
+    public JobId startJob(DeployedJobSpecId deployedJobSpecId, Map<byte[], 
byte[]> jobParameters) throws Exception;
 
     public void cancelJob(JobId jobId) throws Exception;
 
-    public JobId distributeJob(byte[] acggfBytes) throws Exception;
+    public DeployedJobSpecId deployJobSpec(byte[] acggfBytes) throws Exception;
 
-    public JobId destroyJob(JobId jobId) throws Exception;
+    public DeployedJobSpecId undeployJobSpec(DeployedJobSpecId 
deployedJobSpecId) throws Exception;
 
     public NetworkAddress getDatasetDirectoryServiceInfo() throws Exception;
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/ActivityClusterGraphBuilder.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/ActivityClusterGraphBuilder.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/ActivityClusterGraphBuilder.java
index 7dd5fe9..0b2cc9b 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/ActivityClusterGraphBuilder.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/ActivityClusterGraphBuilder.java
@@ -28,7 +28,6 @@ import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.commons.lang3.tuple.Pair;
-
 import org.apache.hyracks.api.dataflow.ActivityId;
 import org.apache.hyracks.api.dataflow.IActivity;
 import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
@@ -36,7 +35,6 @@ import org.apache.hyracks.api.job.ActivityCluster;
 import org.apache.hyracks.api.job.ActivityClusterGraph;
 import org.apache.hyracks.api.job.ActivityClusterId;
 import org.apache.hyracks.api.job.JobActivityGraph;
-import org.apache.hyracks.api.job.JobId;
 
 public class ActivityClusterGraphBuilder {
     private static final Logger LOGGER = 
Logger.getLogger(ActivityClusterGraphBuilder.class.getName());
@@ -70,7 +68,7 @@ public class ActivityClusterGraphBuilder {
         return null;
     }
 
-    public ActivityClusterGraph inferActivityClusters(JobId jobId, 
JobActivityGraph jag) {
+    public ActivityClusterGraph inferActivityClusters(JobActivityGraph jag) {
         /*
          * Build initial equivalence sets map. We create a map such that for 
each IOperatorTask, t -> { t }
          */
@@ -99,7 +97,7 @@ public class ActivityClusterGraphBuilder {
         Map<ActivityId, IActivity> activityNodeMap = jag.getActivityMap();
         List<ActivityCluster> acList = new ArrayList<ActivityCluster>();
         for (Set<ActivityId> stage : stages) {
-            ActivityCluster ac = new ActivityCluster(acg, new 
ActivityClusterId(jobId, acCounter++));
+            ActivityCluster ac = new ActivityCluster(acg, new 
ActivityClusterId(acCounter++));
             acList.add(ac);
             for (ActivityId aid : stage) {
                 IActivity activity = activityNodeMap.get(aid);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
index c712b36..ddf0ce8 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
@@ -32,7 +32,6 @@ import 
org.apache.hyracks.api.job.IActivityClusterGraphGenerator;
 import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
 import org.apache.hyracks.api.job.JobActivityGraph;
 import org.apache.hyracks.api.job.JobFlag;
-import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.rewriter.ActivityClusterGraphRewriter;
 
@@ -51,8 +50,8 @@ public class 
JobSpecificationActivityClusterGraphGeneratorFactory implements IAc
     }
 
     @Override
-    public IActivityClusterGraphGenerator 
createActivityClusterGraphGenerator(JobId jobId,
-            final ICCServiceContext ccServiceCtx, Set<JobFlag> jobFlags) 
throws HyracksException {
+    public IActivityClusterGraphGenerator 
createActivityClusterGraphGenerator(final ICCServiceContext ccServiceCtx,
+            Set<JobFlag> jobFlags) throws HyracksException {
         final JobActivityGraphBuilder builder = new 
JobActivityGraphBuilder(spec, jobFlags);
         PlanUtils.visit(spec, new IConnectorDescriptorVisitor() {
             @Override
@@ -70,7 +69,7 @@ public class 
JobSpecificationActivityClusterGraphGeneratorFactory implements IAc
         final JobActivityGraph jag = builder.getActivityGraph();
         ActivityClusterGraphBuilder acgb = new ActivityClusterGraphBuilder();
 
-        final ActivityClusterGraph acg = acgb.inferActivityClusters(jobId, 
jag);
+        final ActivityClusterGraph acg = acgb.inferActivityClusters(jag);
         acg.setFrameSize(spec.getFrameSize());
         acg.setMaxReattempts(spec.getMaxReattempts());
         
acg.setJobletEventListenerFactory(spec.getJobletEventListenerFactory());

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java
index 6eb0adb..1100335 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java
@@ -21,6 +21,7 @@ package org.apache.hyracks.api.context;
 import org.apache.hyracks.api.application.INCServiceContext;
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.io.IWorkspaceFileFactory;
+import org.apache.hyracks.api.job.IJobletEventListenerFactory;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.profiling.counters.ICounterContext;
 import org.apache.hyracks.api.resources.IDeallocatableRegistry;
@@ -34,6 +35,8 @@ public interface IHyracksJobletContext extends 
IWorkspaceFileFactory, IDeallocat
 
     Object getGlobalJobData();
 
+    IJobletEventListenerFactory getJobletEventListenerFactory();
+
     Class<?> loadClass(String className) throws HyracksException;
 
     ClassLoader getClassLoader() throws HyracksException;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
index 10bb336..bf42d0c 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ExecutorService;
 import org.apache.hyracks.api.dataflow.TaskAttemptId;
 import org.apache.hyracks.api.dataset.IDatasetPartitionManager;
 import org.apache.hyracks.api.deployment.DeploymentId;
+import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.io.IWorkspaceFileFactory;
 import org.apache.hyracks.api.job.IOperatorEnvironment;
 import org.apache.hyracks.api.job.JobFlag;
@@ -52,6 +53,8 @@ public interface IHyracksTaskContext
 
     Object getSharedObject();
 
+    public byte[] getJobParameter(byte[] name, int start, int length) throws 
HyracksException;
+
     Set<JobFlag> getJobFlags();
 
     IStatsCollector getStatsCollector();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index 7926469..eaf9bbf 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -54,9 +54,9 @@ public class ErrorCode {
     public static final int INCONSISTENT_RESULT_METADATA = 18;
     public static final int CANNOT_DELETE_FILE = 19;
     public static final int NOT_A_JOBID = 20;
-    public static final int ERROR_FINDING_DISTRIBUTED_JOB = 21;
-    public static final int DUPLICATE_DISTRIBUTED_JOB = 22;
-    public static final int DISTRIBUTED_JOB_FAILURE = 23;
+    public static final int ERROR_FINDING_DEPLOYED_JOB = 21;
+    public static final int DUPLICATE_DEPLOYED_JOB = 22;
+    public static final int DEPLOYED_JOB_FAILURE = 23;
     public static final int NO_RESULT_SET = 24;
     public static final int JOB_CANCELED = 25;
     public static final int NODE_FAILED = 26;

Reply via email to