Support IFrameWriter contract check. - add a instance-level flag for injecting operators to check IFrameWriter contract violations; - check contract violations in runtime tests.
Change-Id: I9827b06f640858f27ec1bcca2a39991780bee3b1 Reviewed-on: https://asterix-gerrit.ics.uci.edu/1618 Tested-by: Jenkins <[email protected]> BAD: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Yingyi Bu <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/8cf8be67 Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/8cf8be67 Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/8cf8be67 Branch: refs/heads/master Commit: 8cf8be67c3e38f161eeaabefe9f84187de2236e3 Parents: 456cb9f Author: Till Westmann <[email protected]> Authored: Fri Jun 9 20:15:33 2017 -0700 Committer: Yingyi Bu <[email protected]> Committed: Sat Jun 10 09:56:35 2017 -0700 ---------------------------------------------------------------------- .../common/AsterixHyracksIntegrationUtil.java | 12 +- .../asterix/app/translator/QueryTranslator.java | 70 ++++++----- .../app/bootstrap/TestNodeController.java | 2 +- .../aql/translator/QueryTranslatorTest.java | 8 ++ .../apache/asterix/common/utils/JobUtils.java | 10 +- .../operators/physical/SinkPOperator.java | 10 +- .../runtime/base/EnforcePushRuntime.java | 49 ++++++++ .../algebricks/runtime/base/IPushRuntime.java | 23 +++- .../aggreg/AggregateRuntimeFactory.java | 1 + ...estedPlansAccumulatingAggregatorFactory.java | 13 +- .../NestedPlansRunningAggregatorFactory.java | 21 +++- .../base/AbstractOneInputPushRuntime.java | 2 +- .../base/AbstractOneInputSinkPushRuntime.java | 2 +- .../base/AbstractOneInputSourcePushRuntime.java | 11 +- .../meta/AlgebricksMetaOperatorDescriptor.java | 10 +- .../operators/meta/PipelineAssembler.java | 14 ++- .../sort/InMemorySortRuntimeFactory.java | 5 - .../std/EmptyTupleSourceRuntimeFactory.java | 5 + .../std/NestedTupleSourceRuntimeFactory.java | 5 - .../std/RunningAggregateRuntimeFactory.java | 2 +- .../operators/std/SinkWriterRuntime.java | 1 + .../std/StreamSelectRuntimeFactory.java | 19 +-- .../client/HyracksClientInterfaceFunctions.java | 2 +- .../hyracks/api/client/HyracksConnection.java | 13 +- .../api/context/IHyracksTaskContext.java | 4 + .../api/dataflow/EnforceFrameWriter.java | 120 +++++++++++++++++++ .../api/dataflow/IOperatorNodePushable.java | 13 +- .../hyracks/api/exceptions/ErrorCode.java | 8 ++ .../org/apache/hyracks/api/job/JobFlag.java | 3 +- .../SuperActivityOperatorNodePushable.java | 16 ++- .../src/main/resources/errormsg/en.properties | 9 +- .../apache/hyracks/control/cc/job/JobRun.java | 5 +- .../hyracks/control/cc/work/JobStartWork.java | 2 +- .../control/common/controllers/CCConfig.java | 15 ++- .../control/common/controllers/NCConfig.java | 9 +- .../org/apache/hyracks/control/nc/Task.java | 14 ++- .../hyracks/control/nc/work/StartTasksWork.java | 55 +++++---- .../preclustered/PreclusteredGroupWriter.java | 3 + .../std/misc/NullSinkOperatorDescriptor.java | 22 +--- .../std/misc/SinkOperatorDescriptor.java | 22 +--- .../std/misc/SinkOperatorNodePushable.java | 48 ++++++++ .../std/sort/AbstractSortRunGenerator.java | 3 + .../IndexSearchOperatorNodePushable.java | 17 ++- .../hyracks/test/support/TestTaskContext.java | 8 ++ 44 files changed, 504 insertions(+), 202 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java index 9d01d63..ba98f73 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java @@ -25,7 +25,6 @@ import java.io.File; import java.io.IOException; import java.net.Inet4Address; import java.util.ArrayList; -import java.util.EnumSet; import java.util.List; import java.util.logging.Level; import java.util.logging.Logger; @@ -42,9 +41,6 @@ import org.apache.hyracks.api.application.ICCApplication; import org.apache.hyracks.api.application.INCApplication; import org.apache.hyracks.api.client.HyracksConnection; import org.apache.hyracks.api.client.IHyracksClientConnection; -import org.apache.hyracks.api.job.JobFlag; -import org.apache.hyracks.api.job.JobId; -import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.control.cc.ClusterControllerService; import org.apache.hyracks.control.common.config.ConfigManager; import org.apache.hyracks.control.common.controllers.CCConfig; @@ -131,6 +127,7 @@ public class AsterixHyracksIntegrationUtil { ccConfig.setClusterListenPort(DEFAULT_HYRACKS_CC_CLUSTER_PORT); ccConfig.setResultTTL(120000L); ccConfig.setResultSweepThreshold(1000L); + ccConfig.setEnforceFrameWriterProtocol(true); configManager.set(ControllerConfig.Option.DEFAULT_DIR, joinPath(getDefaultStoragePath(), "asterixdb")); return ccConfig; } @@ -215,13 +212,6 @@ public class AsterixHyracksIntegrationUtil { } } - public void runJob(JobSpecification spec) throws Exception { - GlobalConfig.ASTERIX_LOGGER.info(spec.toJSON().toString()); - JobId jobId = hcc.startJob(spec, EnumSet.of(JobFlag.PROFILE_RUNTIME)); - GlobalConfig.ASTERIX_LOGGER.info(jobId.toString()); - hcc.waitForCompletion(jobId); - } - protected String getDefaultStoragePath() { return joinPath("target", "io", "dir"); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index 80c05db..7ce9df6 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@ -28,6 +28,7 @@ import java.rmi.RemoteException; import java.util.ArrayList; import java.util.Collections; import java.util.Date; +import java.util.EnumSet; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedHashSet; @@ -191,8 +192,10 @@ import org.apache.hyracks.api.dataset.ResultSetId; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileSplit; import org.apache.hyracks.api.io.UnmanagedFileSplit; +import org.apache.hyracks.api.job.JobFlag; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobSpecification; +import org.apache.hyracks.control.common.controllers.CCConfig; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory; /* @@ -214,6 +217,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen protected final IRewriterFactory rewriterFactory; protected final IStorageComponentProvider componentProvider; protected final ExecutorService executorService; + protected final EnumSet<JobFlag> jobFlags = EnumSet.noneOf(JobFlag.class); public QueryTranslator(ICcApplicationContext appCtx, List<Statement> statements, SessionOutput output, ILangCompilationProvider compliationProvider, IStorageComponentProvider componentProvider, @@ -228,6 +232,9 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen rewriterFactory = compliationProvider.getRewriterFactory(); activeDataverse = MetadataBuiltinEntities.DEFAULT_DATAVERSE; this.executorService = executorService; + if (appCtx.getServiceContext().getAppConfig().getBoolean(CCConfig.Option.ENFORCE_FRAME_WRITER_PROTOCOL)) { + this.jobFlags.add(JobFlag.ENFORCE_CONTRACT); + } } public SessionOutput getSessionOutput() { @@ -621,7 +628,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen progress.setValue(ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA); // #. runJob - JobUtils.runJob(hcc, jobSpec, true); + runJob(hcc, jobSpec); // #. begin new metadataTxn mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); @@ -653,7 +660,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen JobSpecification jobSpec = DatasetUtil.dropDatasetJobSpec(dataset, metadataProvider); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); bActiveTxn = false; - JobUtils.runJob(hcc, jobSpec, true); + runJob(hcc, jobSpec); } catch (Exception e2) { e.addSuppressed(e2); if (bActiveTxn) { @@ -900,7 +907,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen "Failed to create job spec for replicating Files Index For external dataset"); } filesIndexReplicated = true; - JobUtils.runJob(hcc, spec, true); + runJob(hcc, spec); } } @@ -937,7 +944,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA; // #. create the index artifact in NC. - JobUtils.runJob(hcc, spec, true); + runJob(hcc, spec); mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); bActiveTxn = true; @@ -948,7 +955,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); bActiveTxn = false; - JobUtils.runJob(hcc, spec, true); + runJob(hcc, spec); // #. begin new metadataTxn mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); @@ -986,7 +993,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen ExternalIndexingOperations.buildDropFilesIndexJobSpec(metadataProvider, ds); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); bActiveTxn = false; - JobUtils.runJob(hcc, jobSpec, true); + runJob(hcc, jobSpec); } catch (Exception e2) { e.addSuppressed(e2); if (bActiveTxn) { @@ -1005,7 +1012,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen JobSpecification jobSpec = IndexUtil.buildDropIndexJobSpec(index, metadataProvider, ds); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); bActiveTxn = false; - JobUtils.runJob(hcc, jobSpec, true); + runJob(hcc, jobSpec); } catch (Exception e2) { e.addSuppressed(e2); if (bActiveTxn) { @@ -1189,7 +1196,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA; for (JobSpecification jobSpec : jobsToExecute) { - JobUtils.runJob(hcc, jobSpec, true); + runJob(hcc, jobSpec); } mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); @@ -1226,7 +1233,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen // remove the all indexes in NC try { for (JobSpecification jobSpec : jobsToExecute) { - JobUtils.runJob(hcc, jobSpec, true); + runJob(hcc, jobSpec); } } catch (Exception e2) { // do no throw exception since still the metadata needs to be compensated. @@ -1405,7 +1412,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA; for (JobSpecification jobSpec : jobsToExecute) { - JobUtils.runJob(hcc, jobSpec, true); + runJob(hcc, jobSpec); } // #. begin a new transaction @@ -1466,7 +1473,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA; for (JobSpecification jobSpec : jobsToExecute) { - JobUtils.runJob(hcc, jobSpec, true); + runJob(hcc, jobSpec); } // #. begin a new transaction @@ -1496,7 +1503,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen // remove the all indexes in NC try { for (JobSpecification jobSpec : jobsToExecute) { - JobUtils.runJob(hcc, jobSpec, true); + runJob(hcc, jobSpec); } } catch (Exception e2) { // do no throw exception since still the metadata needs to be compensated. @@ -1659,7 +1666,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); bActiveTxn = false; if (spec != null) { - JobUtils.runJob(hcc, spec, true); + runJob(hcc, spec); } } catch (Exception e) { if (bActiveTxn) { @@ -1725,7 +1732,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen if (jobSpec == null) { return jobSpec; } - JobUtils.runJob(hcc, jobSpec, true); + runJob(hcc, jobSpec); } finally { locker.unlock(); } @@ -1753,7 +1760,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen bActiveTxn = false; if (jobSpec != null && !compileOnly) { - JobUtils.runJob(hcc, jobSpec, true); + runJob(hcc, jobSpec); } return jobSpec; } catch (Exception e) { @@ -1935,7 +1942,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen } else { JobSpecification spec = FeedOperations.buildRemoveFeedStorageJob(metadataProvider, MetadataManager.INSTANCE.getFeed(mdTxnCtx, feedId.getDataverse(), feedId.getEntityName())); - JobUtils.runJob(hcc, spec, true); + runJob(hcc, spec); MetadataManager.INSTANCE.dropFeed(mdTxnCtx, dataverseName, feedName); } @@ -2022,6 +2029,9 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen activeEventHandler.registerListener(listener); IActiveEventSubscriber eventSubscriber = listener.subscribe(ActivityState.STARTED); feedJob.setProperty(ActiveJobNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, entityId); + + // TODO(Yingyi): currently we do not check IFrameWriter protocol violations for Feed jobs. + // We will need to design general exception handling mechanism for feeds. JobUtils.runJob(hcc, feedJob, Boolean.valueOf(metadataProvider.getConfig().get(StartFeedStatement.WAIT_FOR_COMPLETION))); eventSubscriber.sync(); @@ -2211,7 +2221,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen // #. run the jobs for (JobSpecification jobSpec : jobsToExecute) { - JobUtils.runJob(hcc, jobSpec, true); + runJob(hcc, jobSpec); } } catch (Exception e) { if (bActiveTxn) { @@ -2300,14 +2310,14 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen } break; case IMMEDIATE: - createAndRunJob(hcc, null, compiler, locker, resultDelivery, id -> { + createAndRunJob(hcc, jobFlags, null, compiler, locker, resultDelivery, id -> { final ResultReader resultReader = new ResultReader(hdc, id, resultSetId); ResultUtil.printResults(appCtx, resultReader, sessionOutput, stats, metadataProvider.findOutputRecordType()); }, clientContextId, ctx); break; case DEFERRED: - createAndRunJob(hcc, null, compiler, locker, resultDelivery, id -> { + createAndRunJob(hcc, jobFlags, null, compiler, locker, resultDelivery, id -> { ResultUtil.printResultHandle(sessionOutput, new ResultHandle(id, resultSetId)); if (outMetadata != null) { outMetadata.getResultSets() @@ -2325,7 +2335,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen ResultSetId resultSetId, MutableBoolean printed) { Mutable<JobId> jobId = new MutableObject<>(JobId.INVALID); try { - createAndRunJob(hcc, jobId, compiler, locker, resultDelivery, id -> { + createAndRunJob(hcc, jobFlags, jobId, compiler, locker, resultDelivery, id -> { final ResultHandle handle = new ResultHandle(id, resultSetId); ResultUtil.printStatus(sessionOutput, AbstractQueryApiServlet.ResultStatus.RUNNING); ResultUtil.printResultHandle(sessionOutput, handle); @@ -2353,16 +2363,20 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen } } - private static void createAndRunJob(IHyracksClientConnection hcc, Mutable<JobId> jId, IStatementCompiler compiler, - IMetadataLocker locker, ResultDelivery resultDelivery, IResultPrinter printer, String clientContextId, - IStatementExecutorContext ctx) throws Exception { + private void runJob(IHyracksClientConnection hcc, JobSpecification jobSpec) throws Exception { + JobUtils.runJob(hcc, jobSpec, jobFlags, true); + } + + private static void createAndRunJob(IHyracksClientConnection hcc, EnumSet<JobFlag> jobFlags, Mutable<JobId> jId, + IStatementCompiler compiler, IMetadataLocker locker, ResultDelivery resultDelivery, IResultPrinter printer, + String clientContextId, IStatementExecutorContext ctx) throws Exception { locker.lock(); try { final JobSpecification jobSpec = compiler.compile(); if (jobSpec == null) { return; } - final JobId jobId = JobUtils.runJob(hcc, jobSpec, false); + final JobId jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false); if (ctx != null && clientContextId != null) { ctx.put(clientContextId, jobId); // Adds the running job into the context. } @@ -2507,14 +2521,14 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen transactionState = TransactionState.BEGIN; // run the files update job - JobUtils.runJob(hcc, spec, true); + runJob(hcc, spec); for (Index index : indexes) { if (!ExternalIndexingOperations.isFileIndex(index)) { spec = ExternalIndexingOperations.buildIndexUpdateOp(ds, index, metadataFiles, addedFiles, appendedFiles, metadataProvider); // run the files update job - JobUtils.runJob(hcc, spec, true); + runJob(hcc, spec); } } @@ -2533,7 +2547,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen bActiveTxn = false; transactionState = TransactionState.READY_TO_COMMIT; // We don't release the latch since this job is expected to be quick - JobUtils.runJob(hcc, spec, true); + runJob(hcc, spec); // Start a new metadata transaction to record the final state of the transaction mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); metadataProvider.setMetadataTxnContext(mdTxnCtx); @@ -2602,7 +2616,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); bActiveTxn = false; try { - JobUtils.runJob(hcc, spec, true); + runJob(hcc, spec); } catch (Exception e2) { // This should never happen -- fix throw illegal e.addSuppressed(e2); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java index f99793a..7d4b41d 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java @@ -205,7 +205,7 @@ public class TestNodeController { NoOpOperationCallbackFactory.INSTANCE, filterFields, filterFields, false); BTreeSearchOperatorNodePushable searchOp = searchOpDesc.createPushRuntime(ctx, primaryIndexInfo.getSearchRecordDescriptorProvider(), PARTITION, 1); - emptyTupleOp.setFrameWriter(0, searchOp, + emptyTupleOp.setOutputFrameWriter(0, searchOp, primaryIndexInfo.getSearchRecordDescriptorProvider().getInputRecordDescriptor(null, 0)); searchOp.setOutputFrameWriter(0, countOp, primaryIndexInfo.rDesc); return emptyTupleOp; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java index 5e3da5f..b80f8f8 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java @@ -40,6 +40,9 @@ import org.apache.asterix.lang.common.statement.RunStatement; import org.apache.asterix.runtime.utils.CcApplicationContext; import org.apache.asterix.translator.IStatementExecutor; import org.apache.asterix.translator.SessionOutput; +import org.apache.hyracks.api.application.ICCServiceContext; +import org.apache.hyracks.api.config.IApplicationConfig; +import org.apache.hyracks.control.common.controllers.CCConfig; import org.junit.Assert; import org.junit.Test; @@ -59,6 +62,11 @@ public class QueryTranslatorTest { ExternalProperties mockAsterixExternalProperties = mock(ExternalProperties.class); when(mockAsterixAppContextInfo.getExternalProperties()).thenReturn(mockAsterixExternalProperties); when(mockAsterixExternalProperties.getAPIServerPort()).thenReturn(19002); + ICCServiceContext mockServiceContext = mock(ICCServiceContext.class); + when(mockAsterixAppContextInfo.getServiceContext()).thenReturn(mockServiceContext); + IApplicationConfig mockApplicationConfig = mock(IApplicationConfig.class); + when(mockServiceContext.getAppConfig()).thenReturn(mockApplicationConfig); + when(mockApplicationConfig.getBoolean(CCConfig.Option.ENFORCE_FRAME_WRITER_PROTOCOL)).thenReturn(true); // Mocks AsterixClusterProperties. Cluster mockCluster = mock(Cluster.class); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java index 41f9e67..cacbfbc 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java @@ -19,7 +19,10 @@ package org.apache.asterix.common.utils; +import java.util.EnumSet; + import org.apache.hyracks.api.client.IHyracksClientConnection; +import org.apache.hyracks.api.job.JobFlag; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobSpecification; @@ -32,8 +35,13 @@ public class JobUtils { public static JobId runJob(IHyracksClientConnection hcc, JobSpecification spec, boolean waitForCompletion) throws Exception { + return runJob(hcc, spec, EnumSet.noneOf(JobFlag.class), waitForCompletion); + } + + public static JobId runJob(IHyracksClientConnection hcc, JobSpecification spec, EnumSet<JobFlag> jobFlags, + boolean waitForCompletion) throws Exception { spec.setMaxReattempts(0); - final JobId jobId = hcc.startJob(spec); + final JobId jobId = hcc.startJob(spec, jobFlags); if (waitForCompletion) { hcc.waitForCompletion(jobId); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java index 71acecf..d0b7b47 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java @@ -21,28 +21,20 @@ package org.apache.hyracks.algebricks.core.algebra.operators.physical; import java.util.ArrayList; import java.util.List; -import org.apache.commons.lang3.mutable.Mutable; - import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; -import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan; import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext; import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator; import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty; import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector; import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements; import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector; import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext; -import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper; -import org.apache.hyracks.algebricks.runtime.operators.base.SinkRuntimeFactory; -import org.apache.hyracks.api.dataflow.value.RecordDescriptor; -import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; import org.apache.hyracks.dataflow.std.misc.SinkOperatorDescriptor; -import org.apache.hyracks.dataflow.std.union.UnionAllOperatorDescriptor; +import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; public class SinkPOperator extends AbstractPhysicalOperator { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/EnforcePushRuntime.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/EnforcePushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/EnforcePushRuntime.java new file mode 100644 index 0000000..26002e6 --- /dev/null +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/EnforcePushRuntime.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.algebricks.runtime.base; + +import org.apache.hyracks.algebricks.runtime.operators.std.NestedTupleSourceRuntimeFactory.NestedTupleSourceRuntime; +import org.apache.hyracks.api.comm.IFrameWriter; +import org.apache.hyracks.api.dataflow.EnforceFrameWriter; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; + +public class EnforcePushRuntime extends EnforceFrameWriter implements IPushRuntime { + + private final IPushRuntime pushRuntime; + + private EnforcePushRuntime(IPushRuntime pushRuntime) { + super(pushRuntime); + this.pushRuntime = pushRuntime; + } + + @Override + public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) { + pushRuntime.setOutputFrameWriter(index, writer, recordDesc); + } + + @Override + public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor) { + pushRuntime.setInputRecordDescriptor(index, recordDescriptor); + } + + public static IPushRuntime enforce(IPushRuntime pushRuntime) { + return pushRuntime instanceof EnforcePushRuntime || pushRuntime instanceof NestedTupleSourceRuntime + ? pushRuntime : new EnforcePushRuntime(pushRuntime); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntime.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntime.java index 27d6900..de00697 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntime.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntime.java @@ -22,7 +22,26 @@ import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; public interface IPushRuntime extends IFrameWriter { - public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc); - public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor); + /** + * Sets the output frame writer for this writer. + * + * @param index, + * the index of the output channel. + * @param writer, + * the writer for writing output. + * @param recordDesc, + * the output record descriptor. + */ + void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc); + + /** + * Sets the input record descriptor for this writer. + * + * @param index, + * the index of the input channel. + * @param recordDescriptor, + * the corresponding input record descriptor. + */ + void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java index 42e5157..e99b61b 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java @@ -124,6 +124,7 @@ public class AggregateRuntimeFactory extends AbstractOneInputOneOutputRuntimeFac @Override public void fail() throws HyracksDataException { + failed = true; if (isOpen) { writer.fail(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java index b397f23..893aa61 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java @@ -21,6 +21,7 @@ package org.apache.hyracks.algebricks.runtime.operators.aggreg; import java.nio.ByteBuffer; import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline; +import org.apache.hyracks.algebricks.runtime.base.EnforcePushRuntime; import org.apache.hyracks.algebricks.runtime.base.IPushRuntime; import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory; import org.apache.hyracks.algebricks.runtime.operators.std.NestedTupleSourceRuntimeFactory.NestedTupleSourceRuntime; @@ -29,6 +30,7 @@ import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.JobFlag; import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; import org.apache.hyracks.dataflow.std.group.AbstractAccumulatingAggregatorDescriptorFactory; @@ -52,7 +54,6 @@ public class NestedPlansAccumulatingAggregatorFactory extends AbstractAccumulati @Override public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDesc, RecordDescriptor outRecordDescriptor, int[] keys, int[] partialKeys) throws HyracksDataException { - final AggregatorOutput outputWriter = new AggregatorOutput(subplans, keyFieldIdx.length, decorFieldIdx.length); final NestedTupleSourceRuntime[] pipelines = new NestedTupleSourceRuntime[subplans.length]; for (int i = 0; i < subplans.length; i++) { @@ -91,8 +92,8 @@ public class NestedPlansAccumulatingAggregatorFactory extends AbstractAccumulati } @Override - public boolean outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor stateAccessor, int tIndex, - AggregateState state) throws HyracksDataException { + public boolean outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor stateAccessor, + int tIndex, AggregateState state) throws HyracksDataException { for (int i = 0; i < pipelines.length; i++) { outputWriter.setInputIdx(i); pipelines[i].close(); @@ -144,9 +145,13 @@ public class NestedPlansAccumulatingAggregatorFactory extends AbstractAccumulati IFrameWriter start = writer; IPushRuntimeFactory[] runtimeFactories = subplan.getRuntimeFactories(); RecordDescriptor[] recordDescriptors = subplan.getRecordDescriptors(); + // should enforce protocol + boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT); for (int i = runtimeFactories.length - 1; i >= 0; i--) { IPushRuntime newRuntime = runtimeFactories[i].createPushRuntime(ctx); - newRuntime.setFrameWriter(0, start, recordDescriptors[i]); + newRuntime = enforce ? EnforcePushRuntime.enforce(newRuntime) : newRuntime; + start = enforce ? EnforcePushRuntime.enforce(start) : start; + newRuntime.setOutputFrameWriter(0, start, recordDescriptors[i]); if (i > 0) { newRuntime.setInputRecordDescriptor(0, recordDescriptors[i - 1]); } else { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java index 52245e1..8b8e320 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java @@ -21,6 +21,7 @@ package org.apache.hyracks.algebricks.runtime.operators.aggreg; import java.nio.ByteBuffer; import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline; +import org.apache.hyracks.algebricks.runtime.base.EnforcePushRuntime; import org.apache.hyracks.algebricks.runtime.base.IPushRuntime; import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory; import org.apache.hyracks.algebricks.runtime.operators.std.NestedTupleSourceRuntimeFactory.NestedTupleSourceRuntime; @@ -28,8 +29,10 @@ import org.apache.hyracks.api.comm.IFrameTupleAccessor; import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.comm.VSizeFrame; import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.EnforceFrameWriter; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.JobFlag; import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; @@ -58,11 +61,14 @@ public class NestedPlansRunningAggregatorFactory implements IAggregatorDescripto public IAggregatorDescriptor createAggregator(final IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor, RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults, final IFrameWriter writer) throws HyracksDataException { - final RunningAggregatorOutput outputWriter = new RunningAggregatorOutput(ctx, subplans, keyFieldIdx.length, - decorFieldIdx.length, writer); + final RunningAggregatorOutput outputWriter = + new RunningAggregatorOutput(ctx, subplans, keyFieldIdx.length, decorFieldIdx.length, writer); + // should enforce protocol + boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT); + IFrameWriter enforcedWriter = enforce ? EnforceFrameWriter.enforce(outputWriter) : outputWriter; final NestedTupleSourceRuntime[] pipelines = new NestedTupleSourceRuntime[subplans.length]; for (int i = 0; i < subplans.length; i++) { - pipelines[i] = (NestedTupleSourceRuntime) assemblePipeline(subplans[i], outputWriter, ctx); + pipelines[i] = (NestedTupleSourceRuntime) assemblePipeline(subplans[i], enforcedWriter, ctx); } final ArrayTupleBuilder gbyTb = outputWriter.getGroupByTupleBuilder(); @@ -136,13 +142,17 @@ public class NestedPlansRunningAggregatorFactory implements IAggregatorDescripto private IFrameWriter assemblePipeline(AlgebricksPipeline subplan, IFrameWriter writer, IHyracksTaskContext ctx) throws HyracksDataException { + // should enforce protocol + boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT); // plug the operators IFrameWriter start = writer; IPushRuntimeFactory[] runtimeFactories = subplan.getRuntimeFactories(); RecordDescriptor[] recordDescriptors = subplan.getRecordDescriptors(); for (int i = runtimeFactories.length - 1; i >= 0; i--) { IPushRuntime newRuntime = runtimeFactories[i].createPushRuntime(ctx); - newRuntime.setFrameWriter(0, start, recordDescriptors[i]); + newRuntime = enforce ? EnforcePushRuntime.enforce(newRuntime) : newRuntime; + start = enforce ? EnforceFrameWriter.enforce(start) : start; + newRuntime.setOutputFrameWriter(0, start, recordDescriptors[i]); if (i > 0) { newRuntime.setInputRecordDescriptor(0, recordDescriptors[i - 1]); } else { @@ -206,8 +216,9 @@ public class NestedPlansRunningAggregatorFactory implements IAggregatorDescripto int start = 0; int offset = 0; for (int i = 0; i < fieldEnds.length; i++) { - if (i > 0) + if (i > 0) { start = fieldEnds[i - 1]; + } offset = fieldEnds[i] - start; tb.addField(data, start, offset); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java index 33e7c73..5cced8d 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java @@ -29,7 +29,7 @@ public abstract class AbstractOneInputPushRuntime implements IPushRuntime { protected boolean failed; @Override - public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) { + public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) { this.writer = writer; this.outputRecordDesc = recordDesc; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSinkPushRuntime.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSinkPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSinkPushRuntime.java index e430461..d47199d 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSinkPushRuntime.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSinkPushRuntime.java @@ -26,7 +26,7 @@ public abstract class AbstractOneInputSinkPushRuntime implements IPushRuntime { protected RecordDescriptor inputRecordDesc; @Override - public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) { + public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) { throw new IllegalStateException(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSourcePushRuntime.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSourcePushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSourcePushRuntime.java index b7707d4..35563e0 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSourcePushRuntime.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSourcePushRuntime.java @@ -27,20 +27,29 @@ public abstract class AbstractOneInputSourcePushRuntime extends AbstractOneInput @Override public void nextFrame(ByteBuffer buffer) throws HyracksDataException { + // nextFrame will never be called on this runtime throw new UnsupportedOperationException(); } @Override public void close() throws HyracksDataException { + // close is a no op since this operator completes operating in open() } @Override public void fail() throws HyracksDataException { - writer.fail(); + // fail is a no op since if a failure happened, the operator would've already called fail() on downstream + } + + @Override + public void flush() throws HyracksDataException { + // flush will never be called on this runtime + throw new UnsupportedOperationException(); } @Override public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor) { + // setInputRecordDescriptor will never be called on this runtime since it has no input throw new UnsupportedOperationException(); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java index 1294614..f6ebf19 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java @@ -112,6 +112,7 @@ public class AlgebricksMetaOperatorDescriptor extends AbstractSingleActivityOper return new AbstractUnaryInputUnaryOutputOperatorNodePushable() { private IFrameWriter startOfPipeline; + private boolean opened = false; @Override public void open() throws HyracksDataException { @@ -124,6 +125,7 @@ public class AlgebricksMetaOperatorDescriptor extends AbstractSingleActivityOper pipelineInputRecordDescriptor, pipelineOutputRecordDescriptor); startOfPipeline = pa.assemblePipeline(writer, ctx); } + opened = true; startOfPipeline.open(); } @@ -134,12 +136,16 @@ public class AlgebricksMetaOperatorDescriptor extends AbstractSingleActivityOper @Override public void close() throws HyracksDataException { - startOfPipeline.close(); + if (opened) { + startOfPipeline.close(); + } } @Override public void fail() throws HyracksDataException { - startOfPipeline.fail(); + if (opened) { + startOfPipeline.fail(); + } } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java index 03e2aaf..e1081e0 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java @@ -19,11 +19,14 @@ package org.apache.hyracks.algebricks.runtime.operators.meta; import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline; +import org.apache.hyracks.algebricks.runtime.base.EnforcePushRuntime; import org.apache.hyracks.algebricks.runtime.base.IPushRuntime; import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.EnforceFrameWriter; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.JobFlag; public class PipelineAssembler { @@ -44,18 +47,21 @@ public class PipelineAssembler { this.outputArity = outputArity; } - public IFrameWriter assemblePipeline(IFrameWriter writer, IHyracksTaskContext ctx) throws - HyracksDataException { + public IFrameWriter assemblePipeline(IFrameWriter writer, IHyracksTaskContext ctx) throws HyracksDataException { + // should enforce protocol + boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT); // plug the operators IFrameWriter start = writer;// this.writer; for (int i = pipeline.getRuntimeFactories().length - 1; i >= 0; i--) { IPushRuntime newRuntime = pipeline.getRuntimeFactories()[i].createPushRuntime(ctx); + newRuntime = enforce ? EnforcePushRuntime.enforce(newRuntime) : newRuntime; + start = enforce ? EnforceFrameWriter.enforce(start) : start; if (i == pipeline.getRuntimeFactories().length - 1) { if (outputArity == 1) { - newRuntime.setFrameWriter(0, start, pipelineOutputRecordDescriptor); + newRuntime.setOutputFrameWriter(0, start, pipelineOutputRecordDescriptor); } } else { - newRuntime.setFrameWriter(0, start, pipeline.getRecordDescriptors()[i]); + newRuntime.setOutputFrameWriter(0, start, pipeline.getRecordDescriptors()[i]); } if (i > 0) { newRuntime.setInputRecordDescriptor(0, pipeline.getRecordDescriptors()[i - 1]); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java index 3e30f73..925ff93 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java @@ -79,11 +79,6 @@ public class InMemorySortRuntimeFactory extends AbstractOneInputOneOutputRuntime } @Override - public void fail() throws HyracksDataException { - writer.fail(); - } - - @Override public void close() throws HyracksDataException { try { frameSorter.sort(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java index 2b7c2da..3ccceed 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java @@ -56,6 +56,11 @@ public class EmptyTupleSourceRuntimeFactory implements IPushRuntimeFactory { } @Override + public void fail() throws HyracksDataException { + writer.fail(); + } + + @Override public void close() throws HyracksDataException { writer.close(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java index e123adf..496679f 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java @@ -65,11 +65,6 @@ public class NestedTupleSourceRuntimeFactory implements IPushRuntimeFactory { } @Override - public void fail() throws HyracksDataException { - writer.fail(); - } - - @Override public void flush() throws HyracksDataException { writer.flush(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java index 38fe7d1..33b7725 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java @@ -118,7 +118,7 @@ public class RunningAggregateRuntimeFactory extends AbstractOneInputOneOutputRun @Override public void fail() throws HyracksDataException { if (isOpen) { - super.fail(); + writer.fail(); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java index ebf3d3a..55146e2 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java @@ -81,6 +81,7 @@ public class SinkWriterRuntime extends AbstractOneInputSinkPushRuntime { @Override public void fail() throws HyracksDataException { + // fail() is a no op. in close we will cleanup } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java index 0f57fd7..171544d 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java @@ -85,7 +85,6 @@ public class StreamSelectRuntimeFactory extends AbstractOneInputOneOutputRuntime private IScalarEvaluator eval; private IMissingWriter missingWriter = null; private ArrayTupleBuilder missingTupleBuilder = null; - private boolean isOpen = false; @Override public void open() throws HyracksDataException { @@ -93,7 +92,6 @@ public class StreamSelectRuntimeFactory extends AbstractOneInputOneOutputRuntime initAccessAppendFieldRef(ctx); eval = cond.createScalarEvaluator(ctx); } - isOpen = true; writer.open(); //prepare nullTupleBuilder @@ -107,20 +105,11 @@ public class StreamSelectRuntimeFactory extends AbstractOneInputOneOutputRuntime } @Override - public void fail() throws HyracksDataException { - if (isOpen) { - super.fail(); - } - } - - @Override public void close() throws HyracksDataException { - if (isOpen) { - try { - flushIfNotFailed(); - } finally { - writer.close(); - } + try { + flushIfNotFailed(); + } finally { + writer.close(); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java index aa9232e..e2868ae 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java @@ -177,7 +177,7 @@ public class HyracksClientInterfaceFunctions { } public StartJobFunction(JobId jobId) { - this(null, null, null, jobId); + this(null, null, EnumSet.noneOf(JobFlag.class), jobId); } public StartJobFunction(byte[] acggfBytes, EnumSet<JobFlag> jobFlags) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java index ad54110..75cbf61 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java @@ -102,14 +102,14 @@ public final class HyracksConnection implements IHyracksClientConnection { @Override public JobId startJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception { - JobSpecificationActivityClusterGraphGeneratorFactory jsacggf = new JobSpecificationActivityClusterGraphGeneratorFactory( - jobSpec); + IActivityClusterGraphGeneratorFactory jsacggf = + new JobSpecificationActivityClusterGraphGeneratorFactory(jobSpec); return startJob(jsacggf, jobFlags); } @Override public JobId distributeJob(JobSpecification jobSpec) throws Exception { - JobSpecificationActivityClusterGraphGeneratorFactory jsacggf = + IActivityClusterGraphGeneratorFactory jsacggf = new JobSpecificationActivityClusterGraphGeneratorFactory(jobSpec); return distributeJob(jsacggf); } @@ -212,15 +212,14 @@ public final class HyracksConnection implements IHyracksClientConnection { @Override public JobId startJob(DeploymentId deploymentId, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception { - JobSpecificationActivityClusterGraphGeneratorFactory jsacggf = new JobSpecificationActivityClusterGraphGeneratorFactory( - jobSpec); + IActivityClusterGraphGeneratorFactory jsacggf = + new JobSpecificationActivityClusterGraphGeneratorFactory(jobSpec); return startJob(deploymentId, jsacggf, jobFlags); } @Override public JobId startJob(DeploymentId deploymentId, IActivityClusterGraphGeneratorFactory acggf, - EnumSet<JobFlag> jobFlags) - throws Exception { + EnumSet<JobFlag> jobFlags) throws Exception { return hci.startJob(deploymentId, JavaSerializationUtils.serialize(acggf), jobFlags); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java index c8e4cf8..df693b2 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java @@ -19,6 +19,7 @@ package org.apache.hyracks.api.context; import java.io.Serializable; +import java.util.Set; import java.util.concurrent.ExecutorService; import org.apache.hyracks.api.dataflow.TaskAttemptId; @@ -26,6 +27,7 @@ import org.apache.hyracks.api.dataset.IDatasetPartitionManager; import org.apache.hyracks.api.deployment.DeploymentId; import org.apache.hyracks.api.io.IWorkspaceFileFactory; import org.apache.hyracks.api.job.IOperatorEnvironment; +import org.apache.hyracks.api.job.JobFlag; import org.apache.hyracks.api.job.profiling.counters.ICounterContext; import org.apache.hyracks.api.resources.IDeallocatableRegistry; @@ -48,4 +50,6 @@ public interface IHyracksTaskContext void setSharedObject(Object object); Object getSharedObject(); + + Set<JobFlag> getJobFlags(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/EnforceFrameWriter.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/EnforceFrameWriter.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/EnforceFrameWriter.java new file mode 100644 index 0000000..bf54e01 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/EnforceFrameWriter.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.api.dataflow; + +import java.nio.ByteBuffer; + +import org.apache.hyracks.api.comm.IFrameWriter; +import org.apache.hyracks.api.exceptions.ErrorCode; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class EnforceFrameWriter implements IFrameWriter { + + // The downstream data consumer of this writer. + private final IFrameWriter writer; + + // A flag that indicates whether the data consumer of this writer has failed. + private boolean downstreamFailed = false; + + // A flag that indicates whether the the data producer of this writer has called fail() for this writer. + // There could be two cases: + // CASE 1: the downstream of this writer fails and the exception is propagated to the source operator, which + // cascades to the fail() of this writer; + // CASE 2: the failure happens in the upstream of this writer and the source operator cascades to the fail() + // of this writer. + private boolean failCalledByUpstream = false; + + // A flag that indicates whether the downstream of this writer is open. + private boolean downstreamOpen = false; + + protected EnforceFrameWriter(IFrameWriter writer) { + this.writer = writer; + } + + @Override + public final void open() throws HyracksDataException { + try { + if (downstreamOpen) { + throw HyracksDataException.create(ErrorCode.OPEN_ON_OPEN_WRITER); + } + if (downstreamFailed || failCalledByUpstream) { + throw HyracksDataException.create(ErrorCode.OPEN_ON_FAILED_WRITER); + } + writer.open(); + downstreamOpen = true; + } catch (Throwable th) { + downstreamFailed = true; + throw th; + } + } + + @Override + public final void nextFrame(ByteBuffer buffer) throws HyracksDataException { + if (!downstreamOpen) { + throw HyracksDataException.create(ErrorCode.NEXT_FRAME_ON_CLOSED_WRITER); + } + if (downstreamFailed || failCalledByUpstream) { + throw HyracksDataException.create(ErrorCode.NEXT_FRAME_ON_FAILED_WRITER); + } + try { + writer.nextFrame(buffer); + } catch (Throwable th) { + downstreamFailed = true; + throw th; + } + } + + @Override + public final void flush() throws HyracksDataException { + if (!downstreamOpen) { + throw HyracksDataException.create(ErrorCode.FLUSH_ON_CLOSED_WRITER); + } + if (downstreamFailed || failCalledByUpstream) { + throw HyracksDataException.create(ErrorCode.FLUSH_ON_FAILED_WRITER); + } + try { + writer.flush(); + } catch (Throwable th) { + downstreamFailed = true; + throw th; + } + } + + @Override + public final void fail() throws HyracksDataException { + writer.fail(); + if (failCalledByUpstream) { + throw HyracksDataException.create(ErrorCode.FAIL_ON_FAILED_WRITER); + } + failCalledByUpstream = true; + } + + @Override + public void close() throws HyracksDataException { + writer.close(); + downstreamOpen = false; + if (downstreamFailed && !failCalledByUpstream) { + throw HyracksDataException.create(ErrorCode.MISSED_FAIL_CALL); + } + } + + public static IFrameWriter enforce(IFrameWriter writer) { + return writer instanceof EnforceFrameWriter ? writer : new EnforceFrameWriter(writer); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IOperatorNodePushable.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IOperatorNodePushable.java index f6c201e..82433e8 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IOperatorNodePushable.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IOperatorNodePushable.java @@ -23,16 +23,15 @@ import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.HyracksDataException; public interface IOperatorNodePushable { - public void initialize() throws HyracksDataException; + void initialize() throws HyracksDataException; - public void deinitialize() throws HyracksDataException; + void deinitialize() throws HyracksDataException; - public int getInputArity(); + int getInputArity(); - public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) - throws HyracksDataException; + void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) throws HyracksDataException; - public IFrameWriter getInputFrameWriter(int index); + IFrameWriter getInputFrameWriter(int index); - public String getDisplayName(); + String getDisplayName(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java index b52a6a5..8f36fcd 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java @@ -92,6 +92,14 @@ public class ErrorCode { public static final int DISK_COMPONENT_SCAN_NOT_ALLOWED_FOR_SECONDARY_INDEX = 56; public static final int CANNOT_FIND_MATTER_TUPLE_FOR_ANTI_MATTER_TUPLE = 57; public static final int TASK_ABORTED = 58; + public static final int OPEN_ON_OPEN_WRITER = 59; + public static final int OPEN_ON_FAILED_WRITER = 60; + public static final int NEXT_FRAME_ON_FAILED_WRITER = 61; + public static final int NEXT_FRAME_ON_CLOSED_WRITER = 62; + public static final int FLUSH_ON_FAILED_WRITER = 63; + public static final int FLUSH_ON_CLOSED_WRITER = 64; + public static final int FAIL_ON_FAILED_WRITER = 65; + public static final int MISSED_FAIL_CALL = 66; // Compilation error codes. public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10000; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java index a33c6c9..7225cd4 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java @@ -19,5 +19,6 @@ package org.apache.hyracks.api.job; public enum JobFlag { - PROFILE_RUNTIME + PROFILE_RUNTIME, + ENFORCE_CONTRACT } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java index 314bf8b..7fdf106 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java @@ -35,12 +35,14 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.ActivityId; +import org.apache.hyracks.api.dataflow.EnforceFrameWriter; import org.apache.hyracks.api.dataflow.IActivity; import org.apache.hyracks.api.dataflow.IConnectorDescriptor; import org.apache.hyracks.api.dataflow.IOperatorNodePushable; import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.JobFlag; /** * The runtime of a SuperActivity, which internally executes a DAG of one-to-one @@ -90,18 +92,18 @@ public class SuperActivityOperatorNodePushable implements IOperatorNodePushable private void init() throws HyracksDataException { Queue<Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> childQueue = new LinkedList<>(); List<IConnectorDescriptor> outputConnectors; - + final boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT); /* * Set up the source operators */ for (Entry<ActivityId, IActivity> entry : startActivities.entrySet()) { - IOperatorNodePushable opPushable = entry.getValue().createPushRuntime(ctx, recordDescProvider, partition, - nPartitions); + IOperatorNodePushable opPushable = + entry.getValue().createPushRuntime(ctx, recordDescProvider, partition, nPartitions); operatorNodePushablesBFSOrder.add(opPushable); operatorNodePushables.put(entry.getKey(), opPushable); inputArity += opPushable.getInputArity(); - outputConnectors = MapUtils.getObject(parent.getActivityOutputMap(), entry.getKey(), - Collections.emptyList()); + outputConnectors = + MapUtils.getObject(parent.getActivityOutputMap(), entry.getKey(), Collections.emptyList()); for (IConnectorDescriptor conn : outputConnectors) { childQueue.add(parent.getConnectorActivityMap().get(conn.getConnectorId())); } @@ -131,7 +133,9 @@ public class SuperActivityOperatorNodePushable implements IOperatorNodePushable /* * construct the dataflow connection from a producer to a consumer */ - sourceOp.setOutputFrameWriter(outputChannel, destOp.getInputFrameWriter(inputChannel), + IFrameWriter writer = destOp.getInputFrameWriter(inputChannel); + writer = enforce ? EnforceFrameWriter.enforce(writer) : writer; + sourceOp.setOutputFrameWriter(outputChannel, writer, recordDescProvider.getInputRecordDescriptor(destId, inputChannel)); /* http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties index 35a2fc5..4bf069c 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties @@ -75,6 +75,13 @@ 56 = LSM disk component scan is not allowed for a secondary index 57 = Couldn't find the matter tuple for anti-matter tuple in the primary index 58 = Task %1$s was aborted +59 = Data pipeline protocol violation: open() is called on a opened writer +60 = Data pipeline protocol violation: open() is called on a failed writer +61 = Data pipeline protocol violation: nextFrame() is called on a failed writer +62 = Data pipeline protocol violation: nextFrame() is called on a closed writer +63 = Data pipeline protocol violation: flush() is called on a failed writer +64 = Data pipeline protocol violation: flush() is called on a closed writer +65 = Data pipeline protocol violation: fail() is called twice on a writer +66 = Data pipeline protocol violation: fail() is not called by the upstream when there is a failure in the downstream -# 10000 ---- 19999: compilation errors 10000 = The given rule collection %1$s is not an instance of the List class. http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java index 55a7a82..95a6d9b 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java @@ -20,7 +20,6 @@ package org.apache.hyracks.control.cc.job; import java.io.PrintWriter; import java.io.StringWriter; -import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -116,10 +115,10 @@ public class JobRun implements IJobStatusConditionVariable { } //Run a Pre-distributed job by passing the JobId - public JobRun(ClusterControllerService ccs, DeploymentId deploymentId, JobId jobId, + public JobRun(ClusterControllerService ccs, DeploymentId deploymentId, JobId jobId, Set<JobFlag> jobFlags, PreDistributedJobDescriptor distributedJobDescriptor) throws HyracksException { - this(deploymentId, jobId, EnumSet.noneOf(JobFlag.class), + this(deploymentId, jobId, jobFlags, distributedJobDescriptor.getJobSpecification(), distributedJobDescriptor.getActivityClusterGraph()); Set<Constraint> constaints = distributedJobDescriptor.getActivityClusterGraphConstraints(); this.scheduler = new JobExecutor(ccs, this, constaints, true); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8cf8be67/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java index 2dbb631..e083d2a 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java @@ -68,7 +68,7 @@ public class JobStartWork extends SynchronizableWork { run = new JobRun(ccs, deploymentId, jobId, acggf, acgg, jobFlags); } else { //ActivityClusterGraph has already been distributed - run = new JobRun(ccs, deploymentId, jobId, + run = new JobRun(ccs, deploymentId, jobId, jobFlags, ccs.getPreDistributedJobStore().getDistributedJobDescriptor(jobId)); } jobManager.add(run);
