amansinha100 closed pull request #1562: DRILL-6882: Handle the cases where 
RowKeyJoin's left pipeline being called multiple times.
URL: https://github.com/apache/drill/pull/1562
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/IteratorValidator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/IteratorValidator.java
index 9fbef972621..4f73b00b524 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/IteratorValidator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/IteratorValidator.java
@@ -23,10 +23,20 @@
 
 public class IteratorValidator extends AbstractSingle{
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(IteratorValidator.class);
+  /* isRepeatable flag will be set to true if this validator is created by a 
Repeatable pipeline.
+   * In a repeatable pipeline some state transitions are valid i.e downstream 
operator
+   * can call the upstream operator even after receiving NONE.
+   */
+  public final boolean isRepeatable;
 
-  public IteratorValidator(PhysicalOperator child) {
+  public IteratorValidator(PhysicalOperator child, boolean repeatable) {
     super(child);
     setCost(child.getCost());
+    this.isRepeatable = repeatable;
+  }
+
+  public IteratorValidator(PhysicalOperator child) {
+    this(child, false);
   }
 
   @Override
@@ -36,7 +46,7 @@ public IteratorValidator(PhysicalOperator child) {
 
   @Override
   protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
-    return new IteratorValidator(child);
+    return new IteratorValidator(child, isRepeatable);
   }
 
   @Override
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/RowKeyJoinBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/RowKeyJoinBatch.java
index 941f32144b3..2910da5858b 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/RowKeyJoinBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/RowKeyJoinBatch.java
@@ -132,6 +132,11 @@ public IterOutcome innerNext() {
         return IterOutcome.OK;
       }
 
+      if (rightUpstream == IterOutcome.NONE) {
+        rkJoinState = RowKeyJoinState.DONE;
+        state = BatchState.DONE;
+        return rightUpstream;
+      }
       rightUpstream = next(right);
 
       logger.debug("right input IterOutcome: {}", rightUpstream);
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
index 1ea3895735b..5c70f5d2ecf 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
@@ -47,6 +47,9 @@
   /** For logging/debuggability only. */
   private static volatile int instanceCount;
 
+  /** @see org.apache.drill.exec.physical.config.IteratorValidator */
+  private final boolean isRepeatable;
+
   /** For logging/debuggability only. */
   private final int instNum;
   {
@@ -102,12 +105,17 @@
    */
   private boolean validateBatches;
 
-  public IteratorValidatorBatchIterator(RecordBatch incoming) {
+  public IteratorValidatorBatchIterator(RecordBatch incoming, boolean 
isRepeatable) {
     this.incoming = incoming;
     batchTypeName = incoming.getClass().getSimpleName();
+    this.isRepeatable = isRepeatable;
 
     // (Log construction and close() at same level to bracket instance's 
activity.)
-    logger.trace( "[#{}; on {}]: Being constructed.", instNum, batchTypeName);
+    logger.trace( "[#{}; on {}; repeatable: {}]: Being constructed.", instNum, 
batchTypeName, isRepeatable);
+  }
+
+  public IteratorValidatorBatchIterator(RecordBatch incoming) {
+    this(incoming, false);
   }
 
 
@@ -217,7 +225,7 @@ public IterOutcome next() {
                 instNum, batchTypeName, exceptionState, batchState));
       }
       // (Note:  This could use validationState.)
-      if (batchState == NONE || batchState == STOP) {
+      if ((!isRepeatable && batchState == NONE) || batchState == STOP) {
         throw new IllegalStateException(
             String.format(
                 "next() [on #%d, %s] called again after it returned %s."
@@ -256,8 +264,10 @@ public IterOutcome next() {
         case NONE:
           // NONE is allowed even without seeing a OK_NEW_SCHEMA. Such NONE is 
called
           // FAST NONE.
-          // NONE moves to terminal high-level state.
-          validationState = ValidationState.TERMINAL;
+          // NONE moves to TERMINAL high-level state if NOT repeatable.
+          if (!isRepeatable) {
+            validationState = ValidationState.TERMINAL;
+          }
           break;
         case STOP:
           // STOP is allowed at any time, except if already terminated (checked
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorCreator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorCreator.java
index 4dc58e51b86..b7be8ab6668 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorCreator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorCreator.java
@@ -37,7 +37,7 @@ public IteratorValidatorBatchIterator 
getBatch(ExecutorFragmentContext context,
       throws ExecutionSetupException {
     Preconditions.checkArgument(children.size() == 1);
     RecordBatch child = children.iterator().next();
-    IteratorValidatorBatchIterator iter = new 
IteratorValidatorBatchIterator(child);
+    IteratorValidatorBatchIterator iter = new 
IteratorValidatorBatchIterator(child, config.isRepeatable);
     boolean validateBatches = 
context.getOptions().getOption(ExecConstants.ENABLE_VECTOR_VALIDATOR) ||
                               
context.getConfig().getBoolean(ExecConstants.ENABLE_VECTOR_VALIDATION);
     iter.enableBatchValidation(validateBatches);
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorInjector.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorInjector.java
index 20eba161508..6d86fb3d26b 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorInjector.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorInjector.java
@@ -25,6 +25,7 @@
 import org.apache.drill.exec.physical.base.FragmentRoot;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.config.IteratorValidator;
+import org.apache.drill.exec.physical.config.RowKeyJoinPOP;
 
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 
@@ -32,6 +33,17 @@
     AbstractPhysicalVisitor<PhysicalOperator, FragmentContext, 
ExecutionSetupException> {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(IteratorValidatorInjector.class);
 
+  /* This flag when set creates all the validators as repeatable validators */
+  private final boolean isRepeatablePipeline;
+
+  public IteratorValidatorInjector() {
+    this(false);
+  }
+
+  public IteratorValidatorInjector(boolean repeatablePipeline) {
+    this.isRepeatablePipeline = repeatablePipeline;
+  }
+
   public static FragmentRoot rewritePlanWithIteratorValidator(FragmentContext 
context, FragmentRoot root) throws ExecutionSetupException {
     IteratorValidatorInjector inject = new IteratorValidatorInjector();
     PhysicalOperator newOp = root.accept(inject, context);
@@ -60,11 +72,24 @@ public PhysicalOperator visitOp(PhysicalOperator op, 
FragmentContext context) th
     List<PhysicalOperator> newChildren = Lists.newArrayList();
     PhysicalOperator newOp = op;
 
+    if (op instanceof RowKeyJoinPOP) {
+      /* create a RepeatablePipeline for the left side of RowKeyJoin */
+      PhysicalOperator left = new IteratorValidator(((RowKeyJoinPOP) 
op).getLeft()
+                                   .accept(new 
IteratorValidatorInjector(true), context), true);
+      left.setOperatorId(op.getOperatorId() + 1000);
+      newChildren.add(left);
+      /* right pipeline is not repeatable pipeline */
+      PhysicalOperator right = new IteratorValidator(((RowKeyJoinPOP) 
op).getRight()
+              .accept(this, context));
+      right.setOperatorId(op.getOperatorId() + 1000);
+      newChildren.add(right);
+    } else {
     /* Get the list of child operators */
-    for (PhysicalOperator child : op) {
-      PhysicalOperator validator = new IteratorValidator(child.accept(this, 
context));
-      validator.setOperatorId(op.getOperatorId() + 1000);
-      newChildren.add(validator);
+      for (PhysicalOperator child : op) {
+        PhysicalOperator validator = new IteratorValidator(child.accept(this, 
context), this.isRepeatablePipeline);
+        validator.setOperatorId(op.getOperatorId() + 1000);
+        newChildren.add(validator);
+      }
     }
 
     /* Inject trace operator */


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to