Repository: asterixdb Updated Branches: refs/heads/master 913443086 -> f7100f704
Let SPLIT operator work as expected - Let SPLIT operator function as expected in the optimization framework by referencing the information for the REPLICATE operator Change-Id: I999288ea4cf286e34d735a840843bf161876d3e3 Reviewed-on: https://asterix-gerrit.ics.uci.edu/1542 Reviewed-by: Yingyi Bu <buyin...@gmail.com> Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> BAD: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/f7100f70 Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/f7100f70 Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/f7100f70 Branch: refs/heads/master Commit: f7100f704ccc740fd3bb995abbdd42ebc7044da6 Parents: 9134430 Author: Taewoo Kim <wangs...@yahoo.com> Authored: Fri Mar 3 13:31:19 2017 -0800 Committer: Taewoo Kim <wangs...@gmail.com> Committed: Fri Mar 3 17:10:45 2017 -0800 ---------------------------------------------------------------------- .../logical/AbstractReplicateOperator.java | 9 ++++ .../operators/logical/ReplicateOperator.java | 8 ---- .../operators/logical/SplitOperator.java | 20 ++++++++ .../operators/physical/SplitPOperator.java | 4 +- .../core/jobgen/impl/PlanCompiler.java | 8 ++-- .../rules/ExtractCommonExpressionsRule.java | 7 +-- .../rules/ExtractCommonOperatorsRule.java | 5 +- .../rewriter/rules/PushProjectDownRule.java | 1 + .../rewriter/rules/PushSelectDownRule.java | 2 +- .../SetAlgebricksPhysicalOperatorsRule.java | 4 ++ .../operators/std/SplitOperatorDescriptor.java | 48 ++++++++++++++++---- 11 files changed, 89 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f7100f70/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractReplicateOperator.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractReplicateOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractReplicateOperator.java index f883687..852c392 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractReplicateOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractReplicateOperator.java @@ -100,4 +100,13 @@ public abstract class AbstractReplicateOperator extends AbstractLogicalOperator return createPropagatingAllInputsTypeEnvironment(ctx); } + public boolean isBlocker() { + for (boolean requiresMaterialization : outputMaterializationFlags) { + if (requiresMaterialization) { + return true; + } + } + return false; + } + } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f7100f70/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java index 2d2fd0f..0499327 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java @@ -42,12 +42,4 @@ public class ReplicateOperator extends AbstractReplicateOperator { return visitor.visitReplicateOperator(this, arg); } - public boolean isBlocker() { - for (boolean requiresMaterialization : outputMaterializationFlags) { - if (requiresMaterialization) { - return true; - } - } - return false; - } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f7100f70/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/SplitOperator.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/SplitOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/SplitOperator.java index a996673..7be761b 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/SplitOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/SplitOperator.java @@ -32,10 +32,22 @@ public class SplitOperator extends AbstractReplicateOperator { // Expression that keeps the output branch information for each tuple private final Mutable<ILogicalExpression> branchingExpression; + // Default branch when there is no value from the given branching expression. The default is 0. + private final int defaultBranch; + // When the following is set to true, defaultBranch will be ignored and incoming tuples will be + // propagated to all output branches. The default is false. + private final boolean propageToAllBranchAsDefault; public SplitOperator(int outputArity, Mutable<ILogicalExpression> branchingExpression) { + this(outputArity, branchingExpression, 0, false); + } + + public SplitOperator(int outputArity, Mutable<ILogicalExpression> branchingExpression, int defaultBranch, + boolean propageToAllBranchForMissingExprValue) { super(outputArity); this.branchingExpression = branchingExpression; + this.defaultBranch = defaultBranch; + this.propageToAllBranchAsDefault = propageToAllBranchForMissingExprValue; } @Override @@ -52,6 +64,14 @@ public class SplitOperator extends AbstractReplicateOperator { return branchingExpression; } + public int getDefaultBranch() { + return defaultBranch; + } + + public boolean getPropageToAllBranchAsDefault() { + return propageToAllBranchAsDefault; + } + @Override public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform visitor) throws AlgebricksException { return visitor.transform(branchingExpression); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f7100f70/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SplitPOperator.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SplitPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SplitPOperator.java index 3b8aaab..923e56a 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SplitPOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SplitPOperator.java @@ -47,6 +47,8 @@ public class SplitPOperator extends AbstractReplicatePOperator { throws AlgebricksException { SplitOperator sop = (SplitOperator) op; int outputArity = sop.getOutputArity(); + int defaultBranch = sop.getDefaultBranch(); + boolean propageToAllBranchAsDefault = sop.getPropageToAllBranchAsDefault(); IOperatorDescriptorRegistry spec = builder.getJobSpec(); RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), @@ -59,7 +61,7 @@ public class SplitPOperator extends AbstractReplicatePOperator { IBinaryIntegerInspectorFactory intInsepctorFactory = context.getBinaryIntegerInspectorFactory(); SplitOperatorDescriptor splitOpDesc = new SplitOperatorDescriptor(spec, recDescriptor, outputArity, - brachingExprEvalFactory, intInsepctorFactory); + brachingExprEvalFactory, intInsepctorFactory, defaultBranch, propageToAllBranchAsDefault); contributeOpDesc(builder, (AbstractLogicalOperator) op, splitOpDesc); ILogicalOperator src = op.getInputs().get(0).getValue(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f7100f70/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java index 1a61f2e..2960903 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java @@ -25,14 +25,13 @@ import java.util.Map; import java.util.Map.Entry; import org.apache.commons.lang3.mutable.Mutable; - import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan; import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractReplicateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator; import org.apache.hyracks.api.job.IJobletEventListenerFactory; import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; import org.apache.hyracks.api.job.JobSpecification; @@ -107,8 +106,9 @@ public class PlanCompiler { Mutable<ILogicalOperator> child = entry.getKey(); List<Mutable<ILogicalOperator>> parents = entry.getValue(); if (parents.size() > 1) { - if (child.getValue().getOperatorTag() == LogicalOperatorTag.REPLICATE) { - ReplicateOperator rop = (ReplicateOperator) child.getValue(); + if (child.getValue().getOperatorTag() == LogicalOperatorTag.REPLICATE + || child.getValue().getOperatorTag() == LogicalOperatorTag.SPLIT) { + AbstractReplicateOperator rop = (AbstractReplicateOperator) child.getValue(); if (rop.isBlocker()) { // make the order of the graph edges consistent with the order of rop's outputs List<Mutable<ILogicalOperator>> outputs = rop.getOutputs(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f7100f70/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonExpressionsRule.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonExpressionsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonExpressionsRule.java index 60275dd..f51c9ea 100644 --- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonExpressionsRule.java +++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonExpressionsRule.java @@ -146,9 +146,10 @@ public class ExtractCommonExpressionsRule implements IAlgebraicRewriteRule { } } - // TODO: Deal with replicate properly. Currently, we just clear the expr equivalence map, since we want to avoid incorrect expression replacement - // (the resulting new variables should be assigned live below a replicate). - if (op.getOperatorTag() == LogicalOperatorTag.REPLICATE) { + // TODO: Deal with replicate properly. Currently, we just clear the expr equivalence map, + // since we want to avoid incorrect expression replacement + // (the resulting new variables should be assigned live below a replicate/split). + if (op.getOperatorTag() == LogicalOperatorTag.REPLICATE || op.getOperatorTag() == LogicalOperatorTag.SPLIT) { exprEqClassMap.clear(); return modified; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f7100f70/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java index 474cc73..5a4cacd 100644 --- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java +++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java @@ -459,9 +459,10 @@ public class ExtractCommonOperatorsRule implements IAlgebraicRewriteRule { private void computeClusters(Mutable<ILogicalOperator> parentRef, Mutable<ILogicalOperator> opRef, MutableInt currentClusterId) { - // only replicate operator has multiple outputs + // only replicate or split operator has multiple outputs int outputIndex = 0; - if (opRef.getValue().getOperatorTag() == LogicalOperatorTag.REPLICATE) { + if (opRef.getValue().getOperatorTag() == LogicalOperatorTag.REPLICATE + || opRef.getValue().getOperatorTag() == LogicalOperatorTag.SPLIT) { ReplicateOperator rop = (ReplicateOperator) opRef.getValue(); List<Mutable<ILogicalOperator>> outputs = rop.getOutputs(); for (outputIndex = 0; outputIndex < outputs.size(); outputIndex++) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f7100f70/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushProjectDownRule.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushProjectDownRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushProjectDownRule.java index 2d57e8d..88c0ea9 100644 --- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushProjectDownRule.java +++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushProjectDownRule.java @@ -87,6 +87,7 @@ public class PushProjectDownRule implements IAlgebraicRewriteRule { || op2.getOperatorTag() == LogicalOperatorTag.NESTEDTUPLESOURCE || op2.getOperatorTag() == LogicalOperatorTag.PROJECT || op2.getOperatorTag() == LogicalOperatorTag.REPLICATE + || op2.getOperatorTag() == LogicalOperatorTag.SPLIT || op2.getOperatorTag() == LogicalOperatorTag.UNIONALL) { return new Pair<Boolean, Boolean>(false, false); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f7100f70/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushSelectDownRule.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushSelectDownRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushSelectDownRule.java index aab6d12..29998c2 100644 --- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushSelectDownRule.java +++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushSelectDownRule.java @@ -58,7 +58,7 @@ public class PushSelectDownRule implements IAlgebraicRewriteRule { LogicalOperatorTag tag2 = op2.getOperatorTag(); if (tag2 == LogicalOperatorTag.INNERJOIN || tag2 == LogicalOperatorTag.LEFTOUTERJOIN - || tag2 == LogicalOperatorTag.REPLICATE) { + || tag2 == LogicalOperatorTag.REPLICATE || tag2 == LogicalOperatorTag.SPLIT) { return false; } else { // not a join boolean res = propagateSelectionRec(opRef, opRef2); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f7100f70/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java index 6fdcfdf..7dc59f6 100644 --- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java +++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java @@ -73,6 +73,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.physical.ReplicatePO import org.apache.hyracks.algebricks.core.algebra.operators.physical.RunningAggregatePOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.SinkPOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.SinkWritePOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.physical.SplitPOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.StableSortPOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.StreamLimitPOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.StreamProjectPOperator; @@ -236,6 +237,9 @@ public class SetAlgebricksPhysicalOperatorsRule implements IAlgebraicRewriteRule op.setPhysicalOperator(new ReplicatePOperator()); break; } + case SPLIT: + op.setPhysicalOperator(new SplitPOperator()); + break; case SCRIPT: { op.setPhysicalOperator(new StringStreamingScriptPOperator()); break; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f7100f70/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SplitOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SplitOperatorDescriptor.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SplitOperatorDescriptor.java index be39208..508b1aa 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SplitOperatorDescriptor.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SplitOperatorDescriptor.java @@ -49,14 +49,24 @@ import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputOperatorNodePushab public class SplitOperatorDescriptor extends AbstractReplicateOperatorDescriptor { private static final long serialVersionUID = 1L; - private IScalarEvaluatorFactory brachingExprEvalFactory; - private IBinaryIntegerInspectorFactory intInsepctorFactory; + private final IScalarEvaluatorFactory brachingExprEvalFactory; + private final IBinaryIntegerInspectorFactory intInsepctorFactory; + private final int defaultBranch; + private final boolean propageToAllBranchAsDefault; public SplitOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor rDesc, int outputArity, IScalarEvaluatorFactory brachingExprEvalFactory, IBinaryIntegerInspectorFactory intInsepctorFactory) { + this(spec, rDesc, outputArity, brachingExprEvalFactory, intInsepctorFactory, 0, false); + } + + public SplitOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor rDesc, int outputArity, + IScalarEvaluatorFactory brachingExprEvalFactory, IBinaryIntegerInspectorFactory intInsepctorFactory, + int defaultBranch, boolean propageToAllBranchAsDefault) { super(spec, rDesc, outputArity); this.brachingExprEvalFactory = brachingExprEvalFactory; this.intInsepctorFactory = intInsepctorFactory; + this.defaultBranch = defaultBranch; + this.propageToAllBranchAsDefault = propageToAllBranchAsDefault; } @Override @@ -91,8 +101,7 @@ public class SplitOperatorDescriptor extends AbstractReplicateOperatorDescriptor final FrameTupleAppender[] appenders = new FrameTupleAppender[numberOfNonMaterializedOutputs]; final FrameTupleReference tRef = new FrameTupleReference();; final IBinaryIntegerInspector intInsepctor = intInsepctorFactory.createBinaryIntegerInspector(ctx); - final IScalarEvaluator eval; - eval = brachingExprEvalFactory.createScalarEvaluator(ctx); + final IScalarEvaluator eval = brachingExprEvalFactory.createScalarEvaluator(ctx); for (int i = 0; i < numberOfNonMaterializedOutputs; i++) { appenders[i] = new FrameTupleAppender(new VSizeFrame(ctx), true); } @@ -112,17 +121,40 @@ public class SplitOperatorDescriptor extends AbstractReplicateOperatorDescriptor accessor.reset(bufferAccessor); int tupleCount = accessor.getTupleCount(); // The output branch number that starts from 0. - int outputBranch; + int outputBranch = defaultBranch; + boolean correctBranchValue; for (int i = 0; i < tupleCount; i++) { // Get the output branch number from the field in the given tuple. tRef.reset(accessor, i); eval.evaluate(tRef, p); - outputBranch = intInsepctor.getIntegerValue(p.getByteArray(), p.getStartOffset(), - p.getLength()); + correctBranchValue = true; + + try { + outputBranch = + intInsepctor.getIntegerValue(p.getByteArray(), p.getStartOffset(), p.getLength()); + + if (outputBranch < 0 || outputBranch >= outputArity) { + correctBranchValue = false; + } + } catch (Exception e) { + correctBranchValue = false; + } // Add this tuple to the correct output frame. - FrameUtils.appendToWriter(writers[outputBranch], appenders[outputBranch], accessor, i); + if (correctBranchValue) { + FrameUtils.appendToWriter(writers[outputBranch], appenders[outputBranch], accessor, i); + } else { + // Need to propagate to the all branches? + if (!propageToAllBranchAsDefault) { + FrameUtils.appendToWriter(writers[outputBranch], appenders[outputBranch], accessor, i); + } else { + for (int j = 0; j < outputArity; j++) { + FrameUtils.appendToWriter(writers[j], appenders[j], accessor, i); + } + } + } + } }