Repository: asterixdb
Updated Branches:
  refs/heads/master 913443086 -> f7100f704


Let SPLIT operator work as expected

 - Let SPLIT operator function as expected in the optimization framework
   by referencing the information for the REPLICATE operator

Change-Id: I999288ea4cf286e34d735a840843bf161876d3e3
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1542
Reviewed-by: Yingyi Bu <buyin...@gmail.com>
Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/f7100f70
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/f7100f70
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/f7100f70

Branch: refs/heads/master
Commit: f7100f704ccc740fd3bb995abbdd42ebc7044da6
Parents: 9134430
Author: Taewoo Kim <wangs...@yahoo.com>
Authored: Fri Mar 3 13:31:19 2017 -0800
Committer: Taewoo Kim <wangs...@gmail.com>
Committed: Fri Mar 3 17:10:45 2017 -0800

----------------------------------------------------------------------
 .../logical/AbstractReplicateOperator.java      |  9 ++++
 .../operators/logical/ReplicateOperator.java    |  8 ----
 .../operators/logical/SplitOperator.java        | 20 ++++++++
 .../operators/physical/SplitPOperator.java      |  4 +-
 .../core/jobgen/impl/PlanCompiler.java          |  8 ++--
 .../rules/ExtractCommonExpressionsRule.java     |  7 +--
 .../rules/ExtractCommonOperatorsRule.java       |  5 +-
 .../rewriter/rules/PushProjectDownRule.java     |  1 +
 .../rewriter/rules/PushSelectDownRule.java      |  2 +-
 .../SetAlgebricksPhysicalOperatorsRule.java     |  4 ++
 .../operators/std/SplitOperatorDescriptor.java  | 48 ++++++++++++++++----
 11 files changed, 89 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f7100f70/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractReplicateOperator.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractReplicateOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractReplicateOperator.java
index f883687..852c392 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractReplicateOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractReplicateOperator.java
@@ -100,4 +100,13 @@ public abstract class AbstractReplicateOperator extends 
AbstractLogicalOperator
         return createPropagatingAllInputsTypeEnvironment(ctx);
     }
 
+    public boolean isBlocker() {
+        for (boolean requiresMaterialization : outputMaterializationFlags) {
+            if (requiresMaterialization) {
+                return true;
+            }
+        }
+        return false;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f7100f70/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java
index 2d2fd0f..0499327 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java
@@ -42,12 +42,4 @@ public class ReplicateOperator extends 
AbstractReplicateOperator {
         return visitor.visitReplicateOperator(this, arg);
     }
 
-    public boolean isBlocker() {
-        for (boolean requiresMaterialization : outputMaterializationFlags) {
-            if (requiresMaterialization) {
-                return true;
-            }
-        }
-        return false;
-    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f7100f70/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/SplitOperator.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/SplitOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/SplitOperator.java
index a996673..7be761b 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/SplitOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/SplitOperator.java
@@ -32,10 +32,22 @@ public class SplitOperator extends 
AbstractReplicateOperator {
 
     // Expression that keeps the output branch information for each tuple
     private final Mutable<ILogicalExpression> branchingExpression;
+    // Default branch when there is no value from the given branching 
expression. The default is 0.
+    private final int defaultBranch;
+    // When the following is set to true, defaultBranch will be ignored and 
incoming tuples will be
+    // propagated to all output branches. The default is false.
+    private final boolean propageToAllBranchAsDefault;
 
     public SplitOperator(int outputArity, Mutable<ILogicalExpression> 
branchingExpression) {
+        this(outputArity, branchingExpression, 0, false);
+    }
+
+    public SplitOperator(int outputArity, Mutable<ILogicalExpression> 
branchingExpression, int defaultBranch,
+            boolean propageToAllBranchForMissingExprValue) {
         super(outputArity);
         this.branchingExpression = branchingExpression;
+        this.defaultBranch = defaultBranch;
+        this.propageToAllBranchAsDefault = 
propageToAllBranchForMissingExprValue;
     }
 
     @Override
@@ -52,6 +64,14 @@ public class SplitOperator extends AbstractReplicateOperator 
{
         return branchingExpression;
     }
 
+    public int getDefaultBranch() {
+        return defaultBranch;
+    }
+
+    public boolean getPropageToAllBranchAsDefault() {
+        return propageToAllBranchAsDefault;
+    }
+
     @Override
     public boolean 
acceptExpressionTransform(ILogicalExpressionReferenceTransform visitor) throws 
AlgebricksException {
         return visitor.transform(branchingExpression);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f7100f70/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SplitPOperator.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SplitPOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SplitPOperator.java
index 3b8aaab..923e56a 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SplitPOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SplitPOperator.java
@@ -47,6 +47,8 @@ public class SplitPOperator extends 
AbstractReplicatePOperator {
             throws AlgebricksException {
         SplitOperator sop = (SplitOperator) op;
         int outputArity = sop.getOutputArity();
+        int defaultBranch = sop.getDefaultBranch();
+        boolean propageToAllBranchAsDefault = 
sop.getPropageToAllBranchAsDefault();
 
         IOperatorDescriptorRegistry spec = builder.getJobSpec();
         RecordDescriptor recDescriptor = 
JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op),
@@ -59,7 +61,7 @@ public class SplitPOperator extends 
AbstractReplicatePOperator {
         IBinaryIntegerInspectorFactory intInsepctorFactory = 
context.getBinaryIntegerInspectorFactory();
 
         SplitOperatorDescriptor splitOpDesc = new 
SplitOperatorDescriptor(spec, recDescriptor, outputArity,
-                brachingExprEvalFactory, intInsepctorFactory);
+                brachingExprEvalFactory, intInsepctorFactory, defaultBranch, 
propageToAllBranchAsDefault);
 
         contributeOpDesc(builder, (AbstractLogicalOperator) op, splitOpDesc);
         ILogicalOperator src = op.getInputs().get(0).getValue();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f7100f70/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
index 1a61f2e..2960903 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
@@ -25,14 +25,13 @@ import java.util.Map;
 import java.util.Map.Entry;
 
 import org.apache.commons.lang3.mutable.Mutable;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractReplicateOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
-import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
 import org.apache.hyracks.api.job.IJobletEventListenerFactory;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
 import org.apache.hyracks.api.job.JobSpecification;
@@ -107,8 +106,9 @@ public class PlanCompiler {
             Mutable<ILogicalOperator> child = entry.getKey();
             List<Mutable<ILogicalOperator>> parents = entry.getValue();
             if (parents.size() > 1) {
-                if (child.getValue().getOperatorTag() == 
LogicalOperatorTag.REPLICATE) {
-                    ReplicateOperator rop = (ReplicateOperator) 
child.getValue();
+                if (child.getValue().getOperatorTag() == 
LogicalOperatorTag.REPLICATE
+                        || child.getValue().getOperatorTag() == 
LogicalOperatorTag.SPLIT) {
+                    AbstractReplicateOperator rop = 
(AbstractReplicateOperator) child.getValue();
                     if (rop.isBlocker()) {
                         // make the order of the graph edges consistent with 
the order of rop's outputs
                         List<Mutable<ILogicalOperator>> outputs = 
rop.getOutputs();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f7100f70/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonExpressionsRule.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonExpressionsRule.java
 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonExpressionsRule.java
index 60275dd..f51c9ea 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonExpressionsRule.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonExpressionsRule.java
@@ -146,9 +146,10 @@ public class ExtractCommonExpressionsRule implements 
IAlgebraicRewriteRule {
             }
         }
 
-        // TODO: Deal with replicate properly. Currently, we just clear the 
expr equivalence map, since we want to avoid incorrect expression replacement
-        // (the resulting new variables should be assigned live below a 
replicate).
-        if (op.getOperatorTag() == LogicalOperatorTag.REPLICATE) {
+        // TODO: Deal with replicate properly. Currently, we just clear the 
expr equivalence map,
+        // since we want to avoid incorrect expression replacement
+        // (the resulting new variables should be assigned live below a 
replicate/split).
+        if (op.getOperatorTag() == LogicalOperatorTag.REPLICATE || 
op.getOperatorTag() == LogicalOperatorTag.SPLIT) {
             exprEqClassMap.clear();
             return modified;
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f7100f70/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
index 474cc73..5a4cacd 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
@@ -459,9 +459,10 @@ public class ExtractCommonOperatorsRule implements 
IAlgebraicRewriteRule {
 
     private void computeClusters(Mutable<ILogicalOperator> parentRef, 
Mutable<ILogicalOperator> opRef,
             MutableInt currentClusterId) {
-        // only replicate operator has multiple outputs
+        // only replicate or split operator has multiple outputs
         int outputIndex = 0;
-        if (opRef.getValue().getOperatorTag() == LogicalOperatorTag.REPLICATE) 
{
+        if (opRef.getValue().getOperatorTag() == LogicalOperatorTag.REPLICATE
+                || opRef.getValue().getOperatorTag() == 
LogicalOperatorTag.SPLIT) {
             ReplicateOperator rop = (ReplicateOperator) opRef.getValue();
             List<Mutable<ILogicalOperator>> outputs = rop.getOutputs();
             for (outputIndex = 0; outputIndex < outputs.size(); outputIndex++) 
{

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f7100f70/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushProjectDownRule.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushProjectDownRule.java
 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushProjectDownRule.java
index 2d57e8d..88c0ea9 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushProjectDownRule.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushProjectDownRule.java
@@ -87,6 +87,7 @@ public class PushProjectDownRule implements 
IAlgebraicRewriteRule {
                     || op2.getOperatorTag() == 
LogicalOperatorTag.NESTEDTUPLESOURCE
                     || op2.getOperatorTag() == LogicalOperatorTag.PROJECT
                     || op2.getOperatorTag() == LogicalOperatorTag.REPLICATE
+                    || op2.getOperatorTag() == LogicalOperatorTag.SPLIT
                     || op2.getOperatorTag() == LogicalOperatorTag.UNIONALL) {
                 return new Pair<Boolean, Boolean>(false, false);
             }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f7100f70/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushSelectDownRule.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushSelectDownRule.java
 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushSelectDownRule.java
index aab6d12..29998c2 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushSelectDownRule.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushSelectDownRule.java
@@ -58,7 +58,7 @@ public class PushSelectDownRule implements 
IAlgebraicRewriteRule {
         LogicalOperatorTag tag2 = op2.getOperatorTag();
 
         if (tag2 == LogicalOperatorTag.INNERJOIN || tag2 == 
LogicalOperatorTag.LEFTOUTERJOIN
-                || tag2 == LogicalOperatorTag.REPLICATE) {
+                || tag2 == LogicalOperatorTag.REPLICATE || tag2 == 
LogicalOperatorTag.SPLIT) {
             return false;
         } else { // not a join
             boolean res = propagateSelectionRec(opRef, opRef2);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f7100f70/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index 6fdcfdf..7dc59f6 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -73,6 +73,7 @@ import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.ReplicatePO
 import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.RunningAggregatePOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.SinkPOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.SinkWritePOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.SplitPOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.StableSortPOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.StreamLimitPOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.StreamProjectPOperator;
@@ -236,6 +237,9 @@ public class SetAlgebricksPhysicalOperatorsRule implements 
IAlgebraicRewriteRule
                     op.setPhysicalOperator(new ReplicatePOperator());
                     break;
                 }
+                case SPLIT:
+                    op.setPhysicalOperator(new SplitPOperator());
+                    break;
                 case SCRIPT: {
                     op.setPhysicalOperator(new 
StringStreamingScriptPOperator());
                     break;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f7100f70/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SplitOperatorDescriptor.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SplitOperatorDescriptor.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SplitOperatorDescriptor.java
index be39208..508b1aa 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SplitOperatorDescriptor.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SplitOperatorDescriptor.java
@@ -49,14 +49,24 @@ import 
org.apache.hyracks.dataflow.std.base.AbstractUnaryInputOperatorNodePushab
 public class SplitOperatorDescriptor extends 
AbstractReplicateOperatorDescriptor {
     private static final long serialVersionUID = 1L;
 
-    private IScalarEvaluatorFactory brachingExprEvalFactory;
-    private IBinaryIntegerInspectorFactory intInsepctorFactory;
+    private final IScalarEvaluatorFactory brachingExprEvalFactory;
+    private final IBinaryIntegerInspectorFactory intInsepctorFactory;
+    private final int defaultBranch;
+    private final boolean propageToAllBranchAsDefault;
 
     public SplitOperatorDescriptor(IOperatorDescriptorRegistry spec, 
RecordDescriptor rDesc, int outputArity,
             IScalarEvaluatorFactory brachingExprEvalFactory, 
IBinaryIntegerInspectorFactory intInsepctorFactory) {
+        this(spec, rDesc, outputArity, brachingExprEvalFactory, 
intInsepctorFactory, 0, false);
+    }
+
+    public SplitOperatorDescriptor(IOperatorDescriptorRegistry spec, 
RecordDescriptor rDesc, int outputArity,
+            IScalarEvaluatorFactory brachingExprEvalFactory, 
IBinaryIntegerInspectorFactory intInsepctorFactory,
+            int defaultBranch, boolean propageToAllBranchAsDefault) {
         super(spec, rDesc, outputArity);
         this.brachingExprEvalFactory = brachingExprEvalFactory;
         this.intInsepctorFactory = intInsepctorFactory;
+        this.defaultBranch = defaultBranch;
+        this.propageToAllBranchAsDefault = propageToAllBranchAsDefault;
     }
 
     @Override
@@ -91,8 +101,7 @@ public class SplitOperatorDescriptor extends 
AbstractReplicateOperatorDescriptor
             final FrameTupleAppender[] appenders = new 
FrameTupleAppender[numberOfNonMaterializedOutputs];
             final FrameTupleReference tRef = new FrameTupleReference();;
             final IBinaryIntegerInspector intInsepctor = 
intInsepctorFactory.createBinaryIntegerInspector(ctx);
-            final IScalarEvaluator eval;
-            eval = brachingExprEvalFactory.createScalarEvaluator(ctx);
+            final IScalarEvaluator eval = 
brachingExprEvalFactory.createScalarEvaluator(ctx);
             for (int i = 0; i < numberOfNonMaterializedOutputs; i++) {
                 appenders[i] = new FrameTupleAppender(new VSizeFrame(ctx), 
true);
             }
@@ -112,17 +121,40 @@ public class SplitOperatorDescriptor extends 
AbstractReplicateOperatorDescriptor
                     accessor.reset(bufferAccessor);
                     int tupleCount = accessor.getTupleCount();
                     // The output branch number that starts from 0.
-                    int outputBranch;
+                    int outputBranch = defaultBranch;
+                    boolean correctBranchValue;
 
                     for (int i = 0; i < tupleCount; i++) {
                         // Get the output branch number from the field in the 
given tuple.
                         tRef.reset(accessor, i);
                         eval.evaluate(tRef, p);
-                        outputBranch = 
intInsepctor.getIntegerValue(p.getByteArray(), p.getStartOffset(),
-                                p.getLength());
+                        correctBranchValue = true;
+
+                        try {
+                            outputBranch =
+                                    
intInsepctor.getIntegerValue(p.getByteArray(), p.getStartOffset(), 
p.getLength());
+
+                            if (outputBranch < 0 || outputBranch >= 
outputArity) {
+                                correctBranchValue = false;
+                            }
+                        } catch (Exception e) {
+                            correctBranchValue = false;
+                        }
 
                         // Add this tuple to the correct output frame.
-                        FrameUtils.appendToWriter(writers[outputBranch], 
appenders[outputBranch], accessor, i);
+                        if (correctBranchValue) {
+                            FrameUtils.appendToWriter(writers[outputBranch], 
appenders[outputBranch], accessor, i);
+                        } else {
+                            // Need to propagate to the all branches?
+                            if (!propageToAllBranchAsDefault) {
+                                
FrameUtils.appendToWriter(writers[outputBranch], appenders[outputBranch], 
accessor, i);
+                            } else {
+                                for (int j = 0; j < outputArity; j++) {
+                                    FrameUtils.appendToWriter(writers[j], 
appenders[j], accessor, i);
+                                }
+                            }
+                        }
+
                     }
                 }
 

Reply via email to