[ASTERIXDB-1911][HYR,RT,CLUS] Fixes and Improvements for Deployed Jobs Rename Predistributed Jobs to Deployed Jobs Enable job executions to have a map of job parameters Add an Asterix function to retrieve these parameters which are can be assigned when the job is run, e.g. for Deployed jobs Allow Deployed jobs to have new TxnIds and JobIds for each execution Allow simultaneous execution of one Deployed Job
Change-Id: I8f493c1fa977d07dfe8a875f9ebe9515d01d1473 Reviewed-on: https://asterix-gerrit.ics.uci.edu/2045 Tested-by: Jenkins <[email protected]> Contrib: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Xikui Wang <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/e6f426b8 Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/e6f426b8 Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/e6f426b8 Branch: refs/heads/master Commit: e6f426b8019957dd15962926788edbe655218cb9 Parents: 592af65 Author: Steven Glenn Jacobs <[email protected]> Authored: Tue Nov 14 13:56:19 2017 -0800 Committer: Steven Jacobs <[email protected]> Committed: Tue Nov 14 17:02:17 2017 -0800 ---------------------------------------------------------------------- asterixdb/asterix-active/pom.xml | 5 + .../asterix/active/DeployedJobService.java | 108 ++++++++++ .../asterix/app/translator/QueryTranslator.java | 10 +- .../app/bootstrap/TestNodeController.java | 3 + .../common/api/IJobEventListenerFactory.java | 30 +++ .../statement/CreateFunctionStatement.java | 6 +- .../lang/common/visitor/FormatPrintVisitor.java | 7 +- .../asterix/metadata/utils/DatasetUtil.java | 3 +- .../asterix/om/functions/BuiltinFunctions.java | 6 + .../AbstractUnaryStringStringEval.java | 3 +- .../GetJobParameterByNameDescriptor.java | 77 +++++++ .../runtime/functions/FunctionCollection.java | 8 +- .../job/listener/JobEventListenerFactory.java | 30 ++- ...tiTransactionJobletEventListenerFactory.java | 20 +- .../LockThenSearchOperationCallbackFactory.java | 6 +- ...exInstantSearchOperationCallbackFactory.java | 6 +- ...dexModificationOperationCallbackFactory.java | 6 +- ...maryIndexSearchOperationCallbackFactory.java | 6 +- ...dexModificationOperationCallbackFactory.java | 6 +- ...dexModificationOperationCallbackFactory.java | 6 +- ...dexModificationOperationCallbackFactory.java | 6 +- .../UpsertOperationCallbackFactory.java | 6 +- .../runtime/CommitRuntimeFactory.java | 8 +- .../client/HyracksClientInterfaceFunctions.java | 52 +++-- .../HyracksClientInterfaceRemoteProxy.java | 21 +- .../hyracks/api/client/HyracksConnection.java | 19 +- .../api/client/IHyracksClientConnection.java | 22 +- .../api/client/IHyracksClientInterface.java | 7 +- .../impl/ActivityClusterGraphBuilder.java | 6 +- ...ionActivityClusterGraphGeneratorFactory.java | 7 +- .../api/context/IHyracksJobletContext.java | 3 + .../api/context/IHyracksTaskContext.java | 3 + .../hyracks/api/exceptions/ErrorCode.java | 6 +- .../hyracks/api/job/ActivityClusterId.java | 19 +- .../hyracks/api/job/DeployedJobSpecId.java | 91 ++++++++ .../api/job/DeployedJobSpecIdFactory.java | 34 +++ .../IActivityClusterGraphGeneratorFactory.java | 2 +- .../api/job/IJobletEventListenerFactory.java | 7 +- .../hyracks/api/job/JobParameterByteStore.java | 64 ++++++ .../hyracks/control/cc/ClientInterfaceIPCI.java | 36 ++-- .../control/cc/ClusterControllerIPCI.java | 61 +++--- .../control/cc/ClusterControllerService.java | 36 +++- .../control/cc/DeployedJobSpecStore.java | 102 +++++++++ .../control/cc/PreDistributedJobStore.java | 104 ---------- .../cc/dataset/DatasetDirectoryService.java | 12 +- .../control/cc/executor/JobExecutor.java | 16 +- .../hyracks/control/cc/job/JobManager.java | 5 +- .../apache/hyracks/control/cc/job/JobRun.java | 17 +- .../control/cc/work/DeployJobSpecWork.java | 76 +++++++ .../control/cc/work/DeployedJobFailureWork.java | 39 ++++ .../hyracks/control/cc/work/DestroyJobWork.java | 52 ----- .../control/cc/work/DistributeJobWork.java | 80 ------- .../cc/work/DistributedJobFailureWork.java | 39 ---- .../hyracks/control/cc/work/JobStartWork.java | 26 ++- .../control/cc/work/UndeployJobSpecWork.java | 53 +++++ .../control/common/base/IClusterController.java | 3 +- .../control/common/base/INodeController.java | 9 +- .../control/common/ipc/CCNCFunctions.java | 100 +++++++-- .../ipc/ClusterControllerRemoteProxy.java | 26 ++- .../common/ipc/NodeControllerRemoteProxy.java | 28 ++- .../org/apache/hyracks/control/nc/Joblet.java | 16 +- .../hyracks/control/nc/NodeControllerIPCI.java | 18 +- .../control/nc/NodeControllerService.java | 50 +++-- .../org/apache/hyracks/control/nc/Task.java | 3 + .../control/nc/work/CleanupJobletWork.java | 1 + .../control/nc/work/DeployJobSpecWork.java | 62 ++++++ .../hyracks/control/nc/work/DestroyJobWork.java | 54 ----- .../control/nc/work/DistributeJobWork.java | 63 ------ .../hyracks/control/nc/work/StartTasksWork.java | 33 ++- .../control/nc/work/UndeployJobSpecWork.java | 54 +++++ .../tests/integration/DeployedJobSpecsTest.java | 206 +++++++++++++++++++ .../integration/PredistributedJobsTest.java | 186 ----------------- .../hyracks/test/support/TestJobletContext.java | 8 +- .../hyracks/test/support/TestTaskContext.java | 4 + 74 files changed, 1554 insertions(+), 859 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/asterixdb/asterix-active/pom.xml ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-active/pom.xml b/asterixdb/asterix-active/pom.xml index 5865cd1..6b0c381 100644 --- a/asterixdb/asterix-active/pom.xml +++ b/asterixdb/asterix-active/pom.xml @@ -31,6 +31,11 @@ <version>${project.version}</version> </dependency> <dependency> + <groupId>org.apache.asterix</groupId> + <artifactId>asterix-transactions</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> <groupId>org.apache.hyracks</groupId> <artifactId>hyracks-api</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/DeployedJobService.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/DeployedJobService.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/DeployedJobService.java new file mode 100644 index 0000000..070d838 --- /dev/null +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/DeployedJobService.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.active; + +import java.time.Instant; +import java.util.Date; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.asterix.transaction.management.service.transaction.TxnIdFactory; +import org.apache.hyracks.api.client.IHyracksClientConnection; +import org.apache.hyracks.api.job.DeployedJobSpecId; +import org.apache.hyracks.api.job.JobId; + +/** + * Provides functionality for running DeployedJobSpecs + */ +public class DeployedJobService { + + private static final Logger LOGGER = Logger.getLogger(DeployedJobService.class.getName()); + + //To enable new Asterix TxnId for separate deployed job spec invocations + private static final byte[] TRANSACTION_ID_PARAMETER_NAME = "TxnIdParameter".getBytes(); + + //pool size one (only running one thread at a time) + private static final int POOL_SIZE = 1; + + //Starts running a deployed job specification periodically with an interval of "duration" seconds + public static ScheduledExecutorService startRepetitiveDeployedJobSpec(DeployedJobSpecId distributedId, + IHyracksClientConnection hcc, long duration, Map<byte[], byte[]> jobParameters, EntityId entityId) { + ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(POOL_SIZE); + scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + if (!runRepetitiveDeployedJobSpec(distributedId, hcc, jobParameters, duration, entityId)) { + scheduledExecutorService.shutdown(); + } + } catch (Exception e) { + LOGGER.log(Level.SEVERE, "Job Failed to run for " + entityId.getExtensionName() + " " + + entityId.getDataverse() + "." + entityId.getEntityName() + ".", e); + } + } + }, duration, duration, TimeUnit.MILLISECONDS); + return scheduledExecutorService; + } + + public static boolean runRepetitiveDeployedJobSpec(DeployedJobSpecId distributedId, IHyracksClientConnection hcc, + Map<byte[], byte[]> jobParameters, long duration, EntityId entityId) throws Exception { + long executionMilliseconds = runDeployedJobSpec(distributedId, hcc, jobParameters, entityId); + if (executionMilliseconds > duration && LOGGER.isLoggable(Level.SEVERE)) { + LOGGER.log(Level.SEVERE, + "Periodic job for " + entityId.getExtensionName() + " " + entityId.getDataverse() + "." + + entityId.getEntityName() + " was unable to meet the required period of " + duration + + " milliseconds. Actually took " + executionMilliseconds + " execution will shutdown" + + new Date()); + return false; + } + return true; + } + + public synchronized static long runDeployedJobSpec(DeployedJobSpecId distributedId, IHyracksClientConnection hcc, + Map<byte[], byte[]> jobParameters, EntityId entityId) throws Exception { + JobId jobId; + long startTime = Instant.now().toEpochMilli(); + + //Add the Asterix Transaction Id to the map + jobParameters.put(TRANSACTION_ID_PARAMETER_NAME, String.valueOf(TxnIdFactory.create().getId()).getBytes()); + jobId = hcc.startJob(distributedId, jobParameters); + + hcc.waitForCompletion(jobId); + long executionMilliseconds = Instant.now().toEpochMilli() - startTime; + + LOGGER.log(Level.INFO, + "Deployed Job execution completed for " + entityId.getExtensionName() + " " + entityId.getDataverse() + + "." + + entityId.getEntityName() + ". Took " + executionMilliseconds + " milliseconds "); + + return executionMilliseconds; + + } + + @Override + public String toString() { + return "DeployedJobSpecService"; + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index a811454..a52d765 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@ -744,7 +744,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen return DatasetUtil.createNodeGroupForNewDataset(dataverseName, datasetName, selectedNodes, metadataProvider); } - protected void handleCreateIndexStatement(MetadataProvider metadataProvider, Statement stmt, + public void handleCreateIndexStatement(MetadataProvider metadataProvider, Statement stmt, IHyracksClientConnection hcc, IRequestParameters requestParameters) throws Exception { CreateIndexStatement stmtCreateIndex = (CreateIndexStatement) stmt; String dataverseName = getActiveDataverse(stmtCreateIndex.getDataverseName()); @@ -1672,9 +1672,9 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen protected void handleCreateFunctionStatement(MetadataProvider metadataProvider, Statement stmt) throws Exception { CreateFunctionStatement cfs = (CreateFunctionStatement) stmt; - String dataverse = getActiveDataverseName(cfs.getSignature().getNamespace()); - cfs.getSignature().setNamespace(dataverse); - String functionName = cfs.getaAterixFunction().getName(); + String dataverse = getActiveDataverseName(cfs.getFunctionSignature().getNamespace()); + cfs.getFunctionSignature().setNamespace(dataverse); + String functionName = cfs.getFunctionSignature().getName(); MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); metadataProvider.setMetadataTxnContext(mdTxnCtx); @@ -1685,7 +1685,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen if (dv == null) { throw new AlgebricksException("There is no dataverse with this name " + dataverse + "."); } - Function function = new Function(dataverse, functionName, cfs.getaAterixFunction().getArity(), + Function function = new Function(dataverse, functionName, cfs.getFunctionSignature().getArity(), cfs.getParamList(), Function.RETURNTYPE_VOID, cfs.getFunctionBody(), rewriterFactory instanceof SqlppRewriterFactory ? Function.LANGUAGE_SQLPP : Function.LANGUAGE_AQL, FunctionKind.SCALAR.toString()); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java index df8eef7..90b9a1e 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java @@ -58,6 +58,7 @@ import org.apache.asterix.om.types.ARecordType; import org.apache.asterix.om.types.IAType; import org.apache.asterix.runtime.formats.FormatUtils; import org.apache.asterix.runtime.formats.NonTaggedDataFormat; +import org.apache.asterix.runtime.job.listener.JobEventListenerFactory; import org.apache.asterix.runtime.operators.LSMPrimaryUpsertOperatorNodePushable; import org.apache.asterix.runtime.utils.CcApplicationContext; import org.apache.asterix.test.runtime.ExecutionTestUtil; @@ -298,6 +299,8 @@ public class TestNodeController { } JobId jobId = newJobId(); IHyracksJobletContext jobletCtx = Mockito.mock(IHyracksJobletContext.class); + JobEventListenerFactory factory = new JobEventListenerFactory(new TxnId(jobId.getId()), true); + Mockito.when(jobletCtx.getJobletEventListenerFactory()).thenReturn(factory); Mockito.when(jobletCtx.getServiceContext()).thenReturn(ExecutionTestUtil.integrationUtil.ncs[0].getContext()); Mockito.when(jobletCtx.getJobId()).thenReturn(jobId); ctx = Mockito.spy(ctx); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IJobEventListenerFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IJobEventListenerFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IJobEventListenerFactory.java new file mode 100644 index 0000000..0f37b13 --- /dev/null +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IJobEventListenerFactory.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.common.api; + +import org.apache.asterix.common.transactions.TxnId; +import org.apache.hyracks.api.job.IJobletEventListenerFactory; + +/** + * an interface for JobEventListenerFactories to add Asterix transaction JobId getter + */ +public interface IJobEventListenerFactory extends IJobletEventListenerFactory { + + TxnId getTxnId(TxnId compiledTxnId); +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFunctionStatement.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFunctionStatement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFunctionStatement.java index 6d74957..84d66ce 100644 --- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFunctionStatement.java +++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFunctionStatement.java @@ -34,10 +34,6 @@ public class CreateFunctionStatement implements Statement { private final boolean ifNotExists; private final List<String> paramList; - public FunctionSignature getaAterixFunction() { - return signature; - } - public String getFunctionBody() { return functionBody; } @@ -66,7 +62,7 @@ public class CreateFunctionStatement implements Statement { return paramList; } - public FunctionSignature getSignature() { + public FunctionSignature getFunctionSignature() { return signature; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java index a2e7341..c74ee5d 100644 --- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java +++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java @@ -232,7 +232,7 @@ public class FormatPrintVisitor implements ILangVisitor<Void, Integer> { out.print("("); exprList.get(0).accept(this, step + 1); for (int i = 1; i < exprList.size(); i++) { - OperatorType opType = opList.get(i - 1);; + OperatorType opType = opList.get(i - 1); if (i == 1) { printHints(operatorExpr.getHints(), step + 1); } @@ -657,7 +657,7 @@ public class FormatPrintVisitor implements ILangVisitor<Void, Integer> { out.print(skip(step) + CREATE + " index "); out.print(normalize(cis.getIndexName().getValue()) + " "); out.print(generateIfNotExists(cis.getIfNotExists())); - out.print(" on ");; + out.print(" on "); out.print(generateFullName(cis.getDataverseName(), cis.getDatasetName())); out.print(" ("); @@ -827,7 +827,8 @@ public class FormatPrintVisitor implements ILangVisitor<Void, Integer> { public Void visit(CreateFunctionStatement cfs, Integer step) throws CompilationException { out.print(skip(step) + CREATE + " function "); out.print(generateIfNotExists(cfs.getIfNotExists())); - out.print(this.generateFullName(cfs.getSignature().getNamespace(), cfs.getSignature().getName())); + out.print( + this.generateFullName(cfs.getFunctionSignature().getNamespace(), cfs.getFunctionSignature().getName())); out.print("("); printDelimitedStrings(cfs.getParamList(), COMMA); out.println(") {"); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java index 07d3c69..e4c4860 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java @@ -205,7 +205,6 @@ public class DatasetUtil { return new Pair<>(mergePolicyFactory, properties); } - @SuppressWarnings("unchecked") public static void writePropertyTypeRecord(String name, String value, DataOutput out, ARecordType recordType) throws HyracksDataException { IARecordBuilder propertyRecordBuilder = new RecordBuilder(); @@ -399,7 +398,7 @@ public class DatasetUtil { metadataProvider.getSplitProviderAndConstraints(dataset); // prepare callback - TxnId txnId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getTxnId(); + TxnId txnId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getTxnId(null); int[] primaryKeyFields = new int[numKeys]; for (int i = 0; i < numKeys; i++) { primaryKeyFields[i] = i; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java index cdb0dc0..e59c600 100644 --- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java +++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java @@ -864,6 +864,9 @@ public class BuiltinFunctions { public static final FunctionIdentifier EXTERNAL_LOOKUP = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "external-lookup", FunctionIdentifier.VARARGS); + public static final FunctionIdentifier GET_JOB_PARAMETER = + new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "get-job-param", 1); + public static final FunctionIdentifier META = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "meta", FunctionIdentifier.VARARGS); public static final FunctionIdentifier META_KEY = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "meta-key", @@ -1274,6 +1277,9 @@ public class BuiltinFunctions { // external lookup addPrivateFunction(EXTERNAL_LOOKUP, AnyTypeComputer.INSTANCE, false); + // get job parameter + addFunction(GET_JOB_PARAMETER, AnyTypeComputer.INSTANCE, false); + // unnesting function addPrivateFunction(SCAN_COLLECTION, CollectionMemberResultType.INSTANCE, true); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractUnaryStringStringEval.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractUnaryStringStringEval.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractUnaryStringStringEval.java index 1f9909c..95b6ef6 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractUnaryStringStringEval.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractUnaryStringStringEval.java @@ -22,8 +22,8 @@ package org.apache.asterix.runtime.evaluators.functions; import java.io.DataOutput; import java.io.IOException; -import org.apache.asterix.runtime.exceptions.TypeMismatchException; import org.apache.asterix.om.types.ATypeTag; +import org.apache.asterix.runtime.exceptions.TypeMismatchException; import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator; import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; @@ -57,7 +57,6 @@ abstract class AbstractUnaryStringStringEval implements IScalarEvaluator { this.funcID = funcID; } - @SuppressWarnings("unchecked") @Override public void evaluate(IFrameTupleReference tuple, IPointable resultPointable) throws HyracksDataException { resultStorage.reset(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/GetJobParameterByNameDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/GetJobParameterByNameDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/GetJobParameterByNameDescriptor.java new file mode 100644 index 0000000..17f7a96 --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/GetJobParameterByNameDescriptor.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.runtime.evaluators.functions; + +import java.io.IOException; + +import org.apache.asterix.om.functions.BuiltinFunctions; +import org.apache.asterix.om.functions.IFunctionDescriptor; +import org.apache.asterix.om.functions.IFunctionDescriptorFactory; +import org.apache.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor; +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.data.std.api.IPointable; +import org.apache.hyracks.data.std.primitive.UTF8StringPointable; + +public class GetJobParameterByNameDescriptor extends AbstractScalarFunctionDynamicDescriptor { + private static final long serialVersionUID = 1L; + public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() { + @Override + public IFunctionDescriptor createFunctionDescriptor() { + return new GetJobParameterByNameDescriptor(); + } + }; + + @Override + public IScalarEvaluatorFactory createEvaluatorFactory(IScalarEvaluatorFactory[] args) { + return new IScalarEvaluatorFactory() { + private static final long serialVersionUID = 1L; + + @Override + public IScalarEvaluator createScalarEvaluator(IHyracksTaskContext ctx) throws HyracksDataException { + return new AbstractUnaryStringStringEval(ctx, args[0], + GetJobParameterByNameDescriptor.this.getIdentifier()) { + private byte[] result; + + @Override + protected void process(UTF8StringPointable inputString, IPointable resultPointable) + throws IOException { + result = ctx.getJobParameter(inputString.getByteArray(), inputString.getStartOffset(), + inputString.getLength()); + } + + @Override + void writeResult(IPointable resultPointable) throws IOException { + resultPointable.set(result, 0, result.length); + } + }; + } + }; + } + + @Override + public FunctionIdentifier getIdentifier() { + return BuiltinFunctions.GET_JOB_PARAMETER; + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java index 5acbeb9..f3eb6c9 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java @@ -158,6 +158,7 @@ import org.apache.asterix.runtime.evaluators.functions.EditDistanceStringIsFilte import org.apache.asterix.runtime.evaluators.functions.FullTextContainsDescriptor; import org.apache.asterix.runtime.evaluators.functions.FullTextContainsWithoutOptionDescriptor; import org.apache.asterix.runtime.evaluators.functions.GetItemDescriptor; +import org.apache.asterix.runtime.evaluators.functions.GetJobParameterByNameDescriptor; import org.apache.asterix.runtime.evaluators.functions.GramTokensDescriptor; import org.apache.asterix.runtime.evaluators.functions.HashedGramTokensDescriptor; import org.apache.asterix.runtime.evaluators.functions.HashedWordTokensDescriptor; @@ -437,6 +438,9 @@ public final class FunctionCollection { // Inject failure function fc.add(InjectFailureDescriptor.FACTORY); + // Get Job Parameter function + fc.add(GetJobParameterByNameDescriptor.FACTORY); + // Switch case fc.add(SwitchCaseDescriptor.FACTORY); @@ -737,8 +741,8 @@ public final class FunctionCollection { */ private static IFunctionDescriptorFactory getGeneratedFunctionDescriptorFactory(Class<?> cl) { try { - String className = CodeGenHelper.getGeneratedClassName(cl.getName(), - CodeGenHelper.DEFAULT_SUFFIX_FOR_GENERATED_CLASS); + String className = + CodeGenHelper.getGeneratedClassName(cl.getName(), CodeGenHelper.DEFAULT_SUFFIX_FOR_GENERATED_CLASS); Class<?> generatedCl = cl.getClassLoader().loadClass(className); Field factory = generatedCl.getDeclaredField(FACTORY); return (IFunctionDescriptorFactory) factory.get(null); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java index 1422e42..23ad1e1 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java @@ -18,6 +18,7 @@ */ package org.apache.asterix.runtime.job.listener; +import org.apache.asterix.common.api.IJobEventListenerFactory; import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.transactions.DatasetId; @@ -27,14 +28,19 @@ import org.apache.asterix.common.transactions.TxnId; import org.apache.hyracks.api.context.IHyracksJobletContext; import org.apache.hyracks.api.job.IJobletEventListener; import org.apache.hyracks.api.job.IJobletEventListenerFactory; +import org.apache.hyracks.api.job.JobParameterByteStore; import org.apache.hyracks.api.job.JobStatus; -public class JobEventListenerFactory implements IJobletEventListenerFactory { +public class JobEventListenerFactory implements IJobEventListenerFactory { private static final long serialVersionUID = 1L; - private final TxnId txnId; + + private TxnId txnId; private final boolean transactionalWrite; + //To enable new Asterix TxnId for separate deployed job spec invocations + private static final byte[] TRANSACTION_ID_PARAMETER_NAME = "TxnIdParameter".getBytes(); + public JobEventListenerFactory(TxnId txnId, boolean transactionalWrite) { this.txnId = txnId; this.transactionalWrite = transactionalWrite; @@ -45,6 +51,26 @@ public class JobEventListenerFactory implements IJobletEventListenerFactory { } @Override + public TxnId getTxnId(TxnId compiledTxnId) { + return txnId; + } + + @Override + public IJobletEventListenerFactory copyFactory() { + return new JobEventListenerFactory(txnId, transactionalWrite); + } + + @Override + public void updateListenerJobParameters(JobParameterByteStore jobParameterByteStore) { + String AsterixTransactionIdString = + new String(jobParameterByteStore.getParameterValue(TRANSACTION_ID_PARAMETER_NAME, 0, + TRANSACTION_ID_PARAMETER_NAME.length)); + if (AsterixTransactionIdString.length() > 0) { + this.txnId = new TxnId(Integer.parseInt(AsterixTransactionIdString)); + } + } + + @Override public IJobletEventListener createListener(final IHyracksJobletContext jobletContext) { return new IJobletEventListener() { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java index a63f3ca..23c86f3 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java @@ -20,6 +20,7 @@ package org.apache.asterix.runtime.job.listener; import java.util.List; +import org.apache.asterix.common.api.IJobEventListenerFactory; import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.transactions.DatasetId; @@ -29,13 +30,14 @@ import org.apache.asterix.common.transactions.TxnId; import org.apache.hyracks.api.context.IHyracksJobletContext; import org.apache.hyracks.api.job.IJobletEventListener; import org.apache.hyracks.api.job.IJobletEventListenerFactory; +import org.apache.hyracks.api.job.JobParameterByteStore; import org.apache.hyracks.api.job.JobStatus; /** * This Joblet enable transactions on multiple datasets to take place in the same Hyracks Job * It takes a list of Transaction job ids instead of a single job Id */ -public class MultiTransactionJobletEventListenerFactory implements IJobletEventListenerFactory { +public class MultiTransactionJobletEventListenerFactory implements IJobEventListenerFactory { private static final long serialVersionUID = 1L; private final List<TxnId> txnIds; @@ -46,6 +48,22 @@ public class MultiTransactionJobletEventListenerFactory implements IJobletEventL this.transactionalWrite = transactionalWrite; } + //TODO: Enable this factory to be usable for Deployed Jobs + @Override + public TxnId getTxnId(TxnId compiledTxnId) { + return compiledTxnId; + } + + @Override + public IJobletEventListenerFactory copyFactory() { + return new MultiTransactionJobletEventListenerFactory(txnIds, transactionalWrite); + } + + @Override + public void updateListenerJobParameters(JobParameterByteStore jobParameterByteStore) { + //no op + } + @Override public IJobletEventListener createListener(final IHyracksJobletContext jobletContext) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java index 73b9b41..3f3dbd9 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java @@ -18,6 +18,7 @@ */ package org.apache.asterix.transaction.management.opcallbacks; +import org.apache.asterix.common.api.IJobEventListenerFactory; import org.apache.asterix.common.context.ITransactionSubsystemProvider; import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.transactions.AbstractOperationCallbackFactory; @@ -28,6 +29,7 @@ import org.apache.asterix.common.transactions.TxnId; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.IOperatorNodePushable; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.IJobletEventListenerFactory; import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory; public class LockThenSearchOperationCallbackFactory extends AbstractOperationCallbackFactory @@ -45,7 +47,9 @@ public class LockThenSearchOperationCallbackFactory extends AbstractOperationCal IOperatorNodePushable operatorNodePushable) throws HyracksDataException { ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx); try { - ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(txnId, false); + IJobletEventListenerFactory fact = ctx.getJobletContext().getJobletEventListenerFactory(); + ITransactionContext txnCtx = txnSubsystem.getTransactionManager() + .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(txnId), false); return new LockThenSearchOperationCallback(new DatasetId(datasetId), primaryKeyFields, txnSubsystem, txnCtx, operatorNodePushable); } catch (ACIDException e) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java index dbf58e4..93108f9 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java @@ -19,6 +19,7 @@ package org.apache.asterix.transaction.management.opcallbacks; +import org.apache.asterix.common.api.IJobEventListenerFactory; import org.apache.asterix.common.context.ITransactionSubsystemProvider; import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.transactions.AbstractOperationCallbackFactory; @@ -29,6 +30,7 @@ import org.apache.asterix.common.transactions.TxnId; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.IOperatorNodePushable; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.IJobletEventListenerFactory; import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory; import org.apache.hyracks.storage.common.ISearchOperationCallback; @@ -47,7 +49,9 @@ public class PrimaryIndexInstantSearchOperationCallbackFactory extends AbstractO throws HyracksDataException { ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx); try { - ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(txnId, false); + IJobletEventListenerFactory fact = ctx.getJobletContext().getJobletEventListenerFactory(); + ITransactionContext txnCtx = txnSubsystem.getTransactionManager() + .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(txnId), false); return new PrimaryIndexInstantSearchOperationCallback(new DatasetId(datasetId), primaryKeyFields, txnSubsystem.getLockManager(), txnCtx); } catch (ACIDException e) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java index b8c6084..fb01952 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java @@ -19,6 +19,7 @@ package org.apache.asterix.transaction.management.opcallbacks; +import org.apache.asterix.common.api.IJobEventListenerFactory; import org.apache.asterix.common.context.ITransactionSubsystemProvider; import org.apache.asterix.common.dataflow.DatasetLocalResource; import org.apache.asterix.common.exceptions.ACIDException; @@ -32,6 +33,7 @@ import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModifi import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.IOperatorNodePushable; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.IJobletEventListenerFactory; import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.common.IIndex; @@ -66,7 +68,9 @@ public class PrimaryIndexModificationOperationCallbackFactory extends AbstractOp } try { - ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(txnId, false); + IJobletEventListenerFactory fact = ctx.getJobletContext().getJobletEventListenerFactory(); + ITransactionContext txnCtx = txnSubsystem.getTransactionManager() + .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(txnId), false); DatasetLocalResource aResource = (DatasetLocalResource) resource.getResource(); IModificationOperationCallback modCallback = new PrimaryIndexModificationOperationCallback( new DatasetId(datasetId), primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem, http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java index 49490a1..076b0d9 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java @@ -19,6 +19,7 @@ package org.apache.asterix.transaction.management.opcallbacks; +import org.apache.asterix.common.api.IJobEventListenerFactory; import org.apache.asterix.common.context.ITransactionSubsystemProvider; import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.transactions.AbstractOperationCallbackFactory; @@ -29,6 +30,7 @@ import org.apache.asterix.common.transactions.TxnId; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.IOperatorNodePushable; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.IJobletEventListenerFactory; import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory; import org.apache.hyracks.storage.common.ISearchOperationCallback; @@ -47,7 +49,9 @@ public class PrimaryIndexSearchOperationCallbackFactory extends AbstractOperatio IOperatorNodePushable operatorNodePushable) throws HyracksDataException { ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx); try { - ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(txnId, false); + IJobletEventListenerFactory fact = ctx.getJobletContext().getJobletEventListenerFactory(); + ITransactionContext txnCtx = txnSubsystem.getTransactionManager() + .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(txnId), false); return new PrimaryIndexSearchOperationCallback(new DatasetId(datasetId), primaryKeyFields, txnSubsystem.getLockManager(), txnCtx); } catch (ACIDException e) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java index 1847e80..5882046 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java @@ -19,6 +19,7 @@ package org.apache.asterix.transaction.management.opcallbacks; +import org.apache.asterix.common.api.IJobEventListenerFactory; import org.apache.asterix.common.context.ITransactionSubsystemProvider; import org.apache.asterix.common.dataflow.DatasetLocalResource; import org.apache.asterix.common.exceptions.ACIDException; @@ -32,6 +33,7 @@ import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModifi import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.IOperatorNodePushable; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.IJobletEventListenerFactory; import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.common.IModificationOperationCallback; @@ -62,7 +64,9 @@ public class SecondaryIndexModificationOperationCallbackFactory extends Abstract } try { - ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(txnId, false); + IJobletEventListenerFactory fact = ctx.getJobletContext().getJobletEventListenerFactory(); + ITransactionContext txnCtx = txnSubsystem.getTransactionManager() + .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(txnId), false); DatasetLocalResource aResource = (DatasetLocalResource) resource.getResource(); IModificationOperationCallback modCallback = new SecondaryIndexModificationOperationCallback( new DatasetId(datasetId), primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem, http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java index 38adf2b..79ce788 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java @@ -19,6 +19,7 @@ package org.apache.asterix.transaction.management.opcallbacks; +import org.apache.asterix.common.api.IJobEventListenerFactory; import org.apache.asterix.common.context.ITransactionSubsystemProvider; import org.apache.asterix.common.dataflow.DatasetLocalResource; import org.apache.asterix.common.exceptions.ACIDException; @@ -32,6 +33,7 @@ import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModifi import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.IOperatorNodePushable; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.IJobletEventListenerFactory; import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.common.IModificationOperationCallback; @@ -63,7 +65,9 @@ public class TempDatasetPrimaryIndexModificationOperationCallbackFactory extends } try { - ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(txnId, false); + IJobletEventListenerFactory fact = ctx.getJobletContext().getJobletEventListenerFactory(); + ITransactionContext txnCtx = txnSubsystem.getTransactionManager() + .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(txnId), false); DatasetLocalResource aResource = (DatasetLocalResource) resource.getResource(); IModificationOperationCallback modCallback = new TempDatasetIndexModificationOperationCallback( new DatasetId(datasetId), primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem, http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java index 5e499fc..8a27914 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java @@ -19,6 +19,7 @@ package org.apache.asterix.transaction.management.opcallbacks; +import org.apache.asterix.common.api.IJobEventListenerFactory; import org.apache.asterix.common.context.ITransactionSubsystemProvider; import org.apache.asterix.common.dataflow.DatasetLocalResource; import org.apache.asterix.common.exceptions.ACIDException; @@ -32,6 +33,7 @@ import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModifi import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.IOperatorNodePushable; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.IJobletEventListenerFactory; import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.common.IIndex; @@ -65,7 +67,9 @@ public class TempDatasetSecondaryIndexModificationOperationCallbackFactory exten } try { - ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(txnId, false); + IJobletEventListenerFactory fact = ctx.getJobletContext().getJobletEventListenerFactory(); + ITransactionContext txnCtx = txnSubsystem.getTransactionManager() + .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(txnId), false); IModificationOperationCallback modCallback = new TempDatasetIndexModificationOperationCallback( new DatasetId(datasetId), primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem, resource.getId(), aResource.getPartition(), resourceType, indexOp); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java index be75ab9..dfd3eb1 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java @@ -18,6 +18,7 @@ */ package org.apache.asterix.transaction.management.opcallbacks; +import org.apache.asterix.common.api.IJobEventListenerFactory; import org.apache.asterix.common.context.ITransactionSubsystemProvider; import org.apache.asterix.common.dataflow.DatasetLocalResource; import org.apache.asterix.common.exceptions.ACIDException; @@ -31,6 +32,7 @@ import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModifi import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.IOperatorNodePushable; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.IJobletEventListenerFactory; import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.common.IModificationOperationCallback; @@ -62,7 +64,9 @@ public class UpsertOperationCallbackFactory extends AbstractOperationCallbackFac } try { - ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(txnId, false); + IJobletEventListenerFactory fact = ctx.getJobletContext().getJobletEventListenerFactory(); + ITransactionContext txnCtx = txnSubsystem.getTransactionManager() + .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(txnId), false); IModificationOperationCallback modCallback = new UpsertOperationCallback(new DatasetId(datasetId), primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem, resource.getId(), aResource.getPartition(), resourceType, indexOp); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java index 2e43957..58f7e69 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java @@ -19,11 +19,13 @@ package org.apache.asterix.transaction.management.runtime; +import org.apache.asterix.common.api.IJobEventListenerFactory; import org.apache.asterix.common.transactions.TxnId; import org.apache.hyracks.algebricks.runtime.base.IPushRuntime; import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.IJobletEventListenerFactory; public class CommitRuntimeFactory implements IPushRuntimeFactory { @@ -55,7 +57,9 @@ public class CommitRuntimeFactory implements IPushRuntimeFactory { @Override public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException { - return new CommitRuntime(ctx, txnId, datasetId, primaryKeyFields, isTemporaryDatasetWriteJob, - isWriteTransaction, datasetPartitions[ctx.getTaskAttemptId().getTaskId().getPartition()], isSink); + IJobletEventListenerFactory fact = ctx.getJobletContext().getJobletEventListenerFactory(); + return new CommitRuntime(ctx, ((IJobEventListenerFactory) fact).getTxnId(txnId), datasetId, + primaryKeyFields, isTemporaryDatasetWriteJob, isWriteTransaction, + datasetPartitions[ctx.getTaskAttemptId().getTaskId().getPartition()], isSink); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java index 95479c1..23c41fe 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java @@ -22,10 +22,13 @@ import java.io.Serializable; import java.net.URL; import java.util.EnumSet; import java.util.List; +import java.util.Map; +import java.util.Set; import org.apache.hyracks.api.dataset.DatasetDirectoryRecord; import org.apache.hyracks.api.dataset.ResultSetId; import org.apache.hyracks.api.deployment.DeploymentId; +import org.apache.hyracks.api.job.DeployedJobSpecId; import org.apache.hyracks.api.job.JobFlag; import org.apache.hyracks.api.job.JobId; @@ -104,12 +107,12 @@ public class HyracksClientInterfaceFunctions { } } - public static class DistributeJobFunction extends Function { + public static class DeployJobSpecFunction extends Function { private static final long serialVersionUID = 1L; private final byte[] acggfBytes; - public DistributeJobFunction(byte[] acggfBytes) { + public DeployJobSpecFunction(byte[] acggfBytes) { this.acggfBytes = acggfBytes; } @@ -145,13 +148,13 @@ public class HyracksClientInterfaceFunctions { } } - public static class DestroyJobFunction extends Function { + public static class UndeployJobSpecFunction extends Function { private static final long serialVersionUID = 1L; - private final JobId jobId; + private final DeployedJobSpecId deployedJobSpecId; - public DestroyJobFunction(JobId jobId) { - this.jobId = jobId; + public UndeployJobSpecFunction(DeployedJobSpecId deployedJobSpecId) { + this.deployedJobSpecId = deployedJobSpecId; } @Override @@ -159,8 +162,8 @@ public class HyracksClientInterfaceFunctions { return FunctionId.DESTROY_JOB; } - public JobId getJobId() { - return jobId; + public DeployedJobSpecId getDeployedJobSpecId() { + return deployedJobSpecId; } } @@ -168,27 +171,30 @@ public class HyracksClientInterfaceFunctions { private static final long serialVersionUID = 1L; private final byte[] acggfBytes; - private final EnumSet<JobFlag> jobFlags; + private final Set<JobFlag> jobFlags; private final DeploymentId deploymentId; - private final JobId jobId; + private final DeployedJobSpecId deployedJobSpecId; + private final Map<byte[], byte[]> jobParameters; - public StartJobFunction(DeploymentId deploymentId, byte[] acggfBytes, EnumSet<JobFlag> jobFlags, JobId jobId) { + public StartJobFunction(DeploymentId deploymentId, byte[] acggfBytes, Set<JobFlag> jobFlags, + DeployedJobSpecId deployedJobSpecId, Map<byte[], byte[]> jobParameters) { this.acggfBytes = acggfBytes; this.jobFlags = jobFlags; this.deploymentId = deploymentId; - this.jobId = jobId; + this.deployedJobSpecId = deployedJobSpecId; + this.jobParameters = jobParameters; } - public StartJobFunction(JobId jobId) { - this(null, null, EnumSet.noneOf(JobFlag.class), jobId); + public StartJobFunction(DeployedJobSpecId deployedJobSpecId, Map<byte[], byte[]> jobParameters) { + this(null, null, EnumSet.noneOf(JobFlag.class), deployedJobSpecId, jobParameters); } - public StartJobFunction(byte[] acggfBytes, EnumSet<JobFlag> jobFlags) { - this(null, acggfBytes, jobFlags, null); + public StartJobFunction(byte[] acggfBytes, Set<JobFlag> jobFlags) { + this(null, acggfBytes, jobFlags, null, null); } - public StartJobFunction(DeploymentId deploymentId, byte[] acggfBytes, EnumSet<JobFlag> jobFlags) { - this(deploymentId, acggfBytes, jobFlags, null); + public StartJobFunction(DeploymentId deploymentId, byte[] acggfBytes, Set<JobFlag> jobFlags) { + this(deploymentId, acggfBytes, jobFlags, null, null); } @Override @@ -196,15 +202,19 @@ public class HyracksClientInterfaceFunctions { return FunctionId.START_JOB; } - public JobId getJobId() { - return jobId; + public Map<byte[], byte[]> getJobParameters() { + return jobParameters; + } + + public DeployedJobSpecId getDeployedJobSpecId() { + return deployedJobSpecId; } public byte[] getACGGFBytes() { return acggfBytes; } - public EnumSet<JobFlag> getJobFlags() { + public Set<JobFlag> getJobFlags() { return jobFlags; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java index 0ded84f..eddcaa5 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java @@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit; import org.apache.hyracks.api.comm.NetworkAddress; import org.apache.hyracks.api.deployment.DeploymentId; +import org.apache.hyracks.api.job.DeployedJobSpecId; import org.apache.hyracks.api.job.JobFlag; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobInfo; @@ -76,9 +77,9 @@ public class HyracksClientInterfaceRemoteProxy implements IHyracksClientInterfac } @Override - public JobId startJob(JobId jobId) throws Exception { + public JobId startJob(DeployedJobSpecId deployedJobSpecId, Map<byte[], byte[]> jobParameters) throws Exception { HyracksClientInterfaceFunctions.StartJobFunction sjf = - new HyracksClientInterfaceFunctions.StartJobFunction(jobId); + new HyracksClientInterfaceFunctions.StartJobFunction(deployedJobSpecId, jobParameters); return (JobId) rpci.call(ipcHandle, sjf); } @@ -90,17 +91,17 @@ public class HyracksClientInterfaceRemoteProxy implements IHyracksClientInterfac } @Override - public JobId distributeJob(byte[] acggfBytes) throws Exception { - HyracksClientInterfaceFunctions.DistributeJobFunction sjf = - new HyracksClientInterfaceFunctions.DistributeJobFunction(acggfBytes); - return (JobId) rpci.call(ipcHandle, sjf); + public DeployedJobSpecId deployJobSpec(byte[] acggfBytes) throws Exception { + HyracksClientInterfaceFunctions.DeployJobSpecFunction sjf = + new HyracksClientInterfaceFunctions.DeployJobSpecFunction(acggfBytes); + return (DeployedJobSpecId) rpci.call(ipcHandle, sjf); } @Override - public JobId destroyJob(JobId jobId) throws Exception { - HyracksClientInterfaceFunctions.DestroyJobFunction sjf = - new HyracksClientInterfaceFunctions.DestroyJobFunction(jobId); - return (JobId) rpci.call(ipcHandle, sjf); + public DeployedJobSpecId undeployJobSpec(DeployedJobSpecId deployedJobSpecId) throws Exception { + HyracksClientInterfaceFunctions.UndeployJobSpecFunction sjf = + new HyracksClientInterfaceFunctions.UndeployJobSpecFunction(deployedJobSpecId); + return (DeployedJobSpecId) rpci.call(ipcHandle, sjf); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java index e979da6..85ef927 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java @@ -36,6 +36,7 @@ import org.apache.hyracks.api.client.impl.JobSpecificationActivityClusterGraphGe import org.apache.hyracks.api.comm.NetworkAddress; import org.apache.hyracks.api.deployment.DeploymentId; import org.apache.hyracks.api.exceptions.HyracksException; +import org.apache.hyracks.api.job.DeployedJobSpecId; import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory; import org.apache.hyracks.api.job.JobFlag; import org.apache.hyracks.api.job.JobId; @@ -109,20 +110,20 @@ public final class HyracksConnection implements IHyracksClientConnection { } @Override - public JobId distributeJob(JobSpecification jobSpec) throws Exception { - IActivityClusterGraphGeneratorFactory jsacggf = + public DeployedJobSpecId deployJobSpec(JobSpecification jobSpec) throws Exception { + JobSpecificationActivityClusterGraphGeneratorFactory jsacggf = new JobSpecificationActivityClusterGraphGeneratorFactory(jobSpec); - return distributeJob(jsacggf); + return deployJobSpec(jsacggf); } @Override - public JobId destroyJob(JobId jobId) throws Exception { - return hci.destroyJob(jobId); + public DeployedJobSpecId undeployJobSpec(DeployedJobSpecId deployedJobSpecId) throws Exception { + return hci.undeployJobSpec(deployedJobSpecId); } @Override - public JobId startJob(JobId jobId) throws Exception { - return hci.startJob(jobId); + public JobId startJob(DeployedJobSpecId deployedJobSpecId, Map<byte[], byte[]> jobParameters) throws Exception { + return hci.startJob(deployedJobSpecId, jobParameters); } @Override @@ -130,8 +131,8 @@ public final class HyracksConnection implements IHyracksClientConnection { return hci.startJob(JavaSerializationUtils.serialize(acggf), jobFlags); } - public JobId distributeJob(IActivityClusterGraphGeneratorFactory acggf) throws Exception { - return hci.distributeJob(JavaSerializationUtils.serialize(acggf)); + public DeployedJobSpecId deployJobSpec(IActivityClusterGraphGeneratorFactory acggf) throws Exception { + return hci.deployJobSpec(JavaSerializationUtils.serialize(acggf)); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java index a7c1d75..510a6b6 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java @@ -20,9 +20,11 @@ package org.apache.hyracks.api.client; import java.util.EnumSet; import java.util.List; +import java.util.Map; import org.apache.hyracks.api.comm.NetworkAddress; import org.apache.hyracks.api.deployment.DeploymentId; +import org.apache.hyracks.api.job.DeployedJobSpecId; import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory; import org.apache.hyracks.api.job.JobFlag; import org.apache.hyracks.api.job.JobId; @@ -94,25 +96,27 @@ public interface IHyracksClientConnection extends IClusterInfoCollector { * Flags * @throws Exception */ - JobId distributeJob(JobSpecification jobSpec) throws Exception; + DeployedJobSpecId deployJobSpec(JobSpecification jobSpec) throws Exception; /** - * Destroy the distributed graph for a pre-distributed job + * Remove the deployed Job Spec * - * @param jobId - * The id of the predistributed job + * @param deployedJobSpecId + * The id of the deployed job spec * @throws Exception */ - JobId destroyJob(JobId jobId) throws Exception; + DeployedJobSpecId undeployJobSpec(DeployedJobSpecId deployedJobSpecId) throws Exception; /** - * Used to run a pre-distributed job by id (the same JobId will be returned) + * Used to run a deployed Job Spec by id * - * @param jobId - * The id of the predistributed job + * @param deployedJobSpecId + * The id of the deployed job spec + * @param jobParameters + * The serialized job parameters * @throws Exception */ - JobId startJob(JobId jobId) throws Exception; + JobId startJob(DeployedJobSpecId deployedJobSpecId, Map<byte[], byte[]> jobParameters) throws Exception; /** * Start the specified Job. http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java index 9cebd3e..f0c7872 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java @@ -25,6 +25,7 @@ import java.util.Map; import org.apache.hyracks.api.comm.NetworkAddress; import org.apache.hyracks.api.deployment.DeploymentId; +import org.apache.hyracks.api.job.DeployedJobSpecId; import org.apache.hyracks.api.job.JobFlag; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobInfo; @@ -38,13 +39,13 @@ public interface IHyracksClientInterface { public JobId startJob(byte[] acggfBytes, EnumSet<JobFlag> jobFlags) throws Exception; - public JobId startJob(JobId jobId) throws Exception; + public JobId startJob(DeployedJobSpecId deployedJobSpecId, Map<byte[], byte[]> jobParameters) throws Exception; public void cancelJob(JobId jobId) throws Exception; - public JobId distributeJob(byte[] acggfBytes) throws Exception; + public DeployedJobSpecId deployJobSpec(byte[] acggfBytes) throws Exception; - public JobId destroyJob(JobId jobId) throws Exception; + public DeployedJobSpecId undeployJobSpec(DeployedJobSpecId deployedJobSpecId) throws Exception; public NetworkAddress getDatasetDirectoryServiceInfo() throws Exception; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/ActivityClusterGraphBuilder.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/ActivityClusterGraphBuilder.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/ActivityClusterGraphBuilder.java index 7dd5fe9..0b2cc9b 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/ActivityClusterGraphBuilder.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/ActivityClusterGraphBuilder.java @@ -28,7 +28,6 @@ import java.util.logging.Level; import java.util.logging.Logger; import org.apache.commons.lang3.tuple.Pair; - import org.apache.hyracks.api.dataflow.ActivityId; import org.apache.hyracks.api.dataflow.IActivity; import org.apache.hyracks.api.dataflow.IConnectorDescriptor; @@ -36,7 +35,6 @@ import org.apache.hyracks.api.job.ActivityCluster; import org.apache.hyracks.api.job.ActivityClusterGraph; import org.apache.hyracks.api.job.ActivityClusterId; import org.apache.hyracks.api.job.JobActivityGraph; -import org.apache.hyracks.api.job.JobId; public class ActivityClusterGraphBuilder { private static final Logger LOGGER = Logger.getLogger(ActivityClusterGraphBuilder.class.getName()); @@ -70,7 +68,7 @@ public class ActivityClusterGraphBuilder { return null; } - public ActivityClusterGraph inferActivityClusters(JobId jobId, JobActivityGraph jag) { + public ActivityClusterGraph inferActivityClusters(JobActivityGraph jag) { /* * Build initial equivalence sets map. We create a map such that for each IOperatorTask, t -> { t } */ @@ -99,7 +97,7 @@ public class ActivityClusterGraphBuilder { Map<ActivityId, IActivity> activityNodeMap = jag.getActivityMap(); List<ActivityCluster> acList = new ArrayList<ActivityCluster>(); for (Set<ActivityId> stage : stages) { - ActivityCluster ac = new ActivityCluster(acg, new ActivityClusterId(jobId, acCounter++)); + ActivityCluster ac = new ActivityCluster(acg, new ActivityClusterId(acCounter++)); acList.add(ac); for (ActivityId aid : stage) { IActivity activity = activityNodeMap.get(aid); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java index c712b36..ddf0ce8 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java @@ -32,7 +32,6 @@ import org.apache.hyracks.api.job.IActivityClusterGraphGenerator; import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory; import org.apache.hyracks.api.job.JobActivityGraph; import org.apache.hyracks.api.job.JobFlag; -import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.api.rewriter.ActivityClusterGraphRewriter; @@ -51,8 +50,8 @@ public class JobSpecificationActivityClusterGraphGeneratorFactory implements IAc } @Override - public IActivityClusterGraphGenerator createActivityClusterGraphGenerator(JobId jobId, - final ICCServiceContext ccServiceCtx, Set<JobFlag> jobFlags) throws HyracksException { + public IActivityClusterGraphGenerator createActivityClusterGraphGenerator(final ICCServiceContext ccServiceCtx, + Set<JobFlag> jobFlags) throws HyracksException { final JobActivityGraphBuilder builder = new JobActivityGraphBuilder(spec, jobFlags); PlanUtils.visit(spec, new IConnectorDescriptorVisitor() { @Override @@ -70,7 +69,7 @@ public class JobSpecificationActivityClusterGraphGeneratorFactory implements IAc final JobActivityGraph jag = builder.getActivityGraph(); ActivityClusterGraphBuilder acgb = new ActivityClusterGraphBuilder(); - final ActivityClusterGraph acg = acgb.inferActivityClusters(jobId, jag); + final ActivityClusterGraph acg = acgb.inferActivityClusters(jag); acg.setFrameSize(spec.getFrameSize()); acg.setMaxReattempts(spec.getMaxReattempts()); acg.setJobletEventListenerFactory(spec.getJobletEventListenerFactory()); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java index 6eb0adb..1100335 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java @@ -21,6 +21,7 @@ package org.apache.hyracks.api.context; import org.apache.hyracks.api.application.INCServiceContext; import org.apache.hyracks.api.exceptions.HyracksException; import org.apache.hyracks.api.io.IWorkspaceFileFactory; +import org.apache.hyracks.api.job.IJobletEventListenerFactory; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.profiling.counters.ICounterContext; import org.apache.hyracks.api.resources.IDeallocatableRegistry; @@ -34,6 +35,8 @@ public interface IHyracksJobletContext extends IWorkspaceFileFactory, IDeallocat Object getGlobalJobData(); + IJobletEventListenerFactory getJobletEventListenerFactory(); + Class<?> loadClass(String className) throws HyracksException; ClassLoader getClassLoader() throws HyracksException; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java index 10bb336..bf42d0c 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java @@ -25,6 +25,7 @@ import java.util.concurrent.ExecutorService; import org.apache.hyracks.api.dataflow.TaskAttemptId; import org.apache.hyracks.api.dataset.IDatasetPartitionManager; import org.apache.hyracks.api.deployment.DeploymentId; +import org.apache.hyracks.api.exceptions.HyracksException; import org.apache.hyracks.api.io.IWorkspaceFileFactory; import org.apache.hyracks.api.job.IOperatorEnvironment; import org.apache.hyracks.api.job.JobFlag; @@ -52,6 +53,8 @@ public interface IHyracksTaskContext Object getSharedObject(); + public byte[] getJobParameter(byte[] name, int start, int length) throws HyracksException; + Set<JobFlag> getJobFlags(); IStatsCollector getStatsCollector(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6f426b8/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java index 7926469..eaf9bbf 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java @@ -54,9 +54,9 @@ public class ErrorCode { public static final int INCONSISTENT_RESULT_METADATA = 18; public static final int CANNOT_DELETE_FILE = 19; public static final int NOT_A_JOBID = 20; - public static final int ERROR_FINDING_DISTRIBUTED_JOB = 21; - public static final int DUPLICATE_DISTRIBUTED_JOB = 22; - public static final int DISTRIBUTED_JOB_FAILURE = 23; + public static final int ERROR_FINDING_DEPLOYED_JOB = 21; + public static final int DUPLICATE_DEPLOYED_JOB = 22; + public static final int DEPLOYED_JOB_FAILURE = 23; public static final int NO_RESULT_SET = 24; public static final int JOB_CANCELED = 25; public static final int NODE_FAILED = 26;
