ihuzenko commented on a change in pull request #1981: DRILL-7583: Remove STOP 
status from operator outcome
URL: https://github.com/apache/drill/pull/1981#discussion_r379477848
 
 

 ##########
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
 ##########
 @@ -1344,74 +1483,95 @@ private void updateStats() {
     stats.setLongStat(Metric.NUM_RESIZING, htStats.numResizing);
     stats.setLongStat(Metric.RESIZING_TIME_MS, htStats.resizingTime);
     stats.setLongStat(Metric.NUM_PARTITIONS, numPartitions);
-    stats.setLongStat(Metric.SPILL_CYCLE, spilledState.getCycle()); // Put 0 
in case no spill
+    stats.setLongStat(Metric.SPILL_CYCLE, spilledState.getCycle()); // Put 0 in
+                                                                    // case no
+                                                                    // spill
     stats.setLongStat(Metric.SPILLED_PARTITIONS, numSpilled);
   }
 
   /**
-   * Get the hash table iterator that is created for the build side of the 
hash join if
-   * this hash join was instantiated as a row-key join.
-   * @return hash table iterator or null if this hash join was not a row-key 
join or if it
-   * was a row-key join but the build has not yet completed.
+   * Get the hash table iterator that is created for the build side of the hash
+   * join if this hash join was instantiated as a row-key join.
+   *
+   * @return hash table iterator or null if this hash join was not a row-key
+   *         join or if it was a row-key join but the build has not yet
+   *         completed.
    */
   @Override
   public Pair<ValueVector, Integer> nextRowKeyBatch() {
     if (buildComplete) {
-      // partition 0 because Row Key Join has only a single partition - no 
spilling
+      // partition 0 because Row Key Join has only a single partition - no
+      // spilling
       Pair<VectorContainer, Integer> pp = partitions[0].nextBatch();
       if (pp != null) {
         VectorWrapper<?> vw = Iterables.get(pp.getLeft(), 0);
         ValueVector vv = vw.getValueVector();
         return Pair.of(vv, pp.getRight());
       }
-    } else if(partitions == null && firstOutputBatch) { //if there is data 
coming to right(build) side in build Schema stage, use it.
+    } else if (partitions == null && firstOutputBatch) { // if there is data
+                                                         // coming to
+                                                         // right(build) side 
in
+                                                         // build Schema stage,
+                                                         // use it.
       firstOutputBatch = false;
-      if ( right.getRecordCount() > 0 ) {
+      if (right.getRecordCount() > 0) {
         VectorWrapper<?> vw = Iterables.get(right, 0);
         ValueVector vv = vw.getValueVector();
-        return Pair.of(vv, right.getRecordCount()-1);
+        return Pair.of(vv, right.getRecordCount() - 1);
       }
     }
     return null;
   }
 
-  @Override    // implement RowKeyJoin interface
+  @Override // implement RowKeyJoin interface
   public boolean hasRowKeyBatch() {
     return buildComplete;
   }
 
-  @Override   // implement RowKeyJoin interface
+  @Override // implement RowKeyJoin interface
   public BatchState getBatchState() {
     return state;
   }
 
-  @Override  // implement RowKeyJoin interface
+  @Override // implement RowKeyJoin interface
   public void setBatchState(BatchState newState) {
     state = newState;
   }
 
   @Override
-  public void killIncoming(boolean sendUpstream) {
+  protected void cancelIncoming() {
     wasKilled = true;
-    probeBatch.kill(sendUpstream);
-    buildBatch.kill(sendUpstream);
+    probeBatch.cancel();
+    buildBatch.cancel();
   }
 
   public void updateMetrics() {
-    stats.setLongStat(HashJoinBatch.Metric.LEFT_INPUT_BATCH_COUNT, 
batchMemoryManager.getNumIncomingBatches(LEFT_INDEX));
-    stats.setLongStat(HashJoinBatch.Metric.LEFT_AVG_INPUT_BATCH_BYTES, 
batchMemoryManager.getAvgInputBatchSize(LEFT_INDEX));
-    stats.setLongStat(HashJoinBatch.Metric.LEFT_AVG_INPUT_ROW_BYTES, 
batchMemoryManager.getAvgInputRowWidth(LEFT_INDEX));
-    stats.setLongStat(HashJoinBatch.Metric.LEFT_INPUT_RECORD_COUNT, 
batchMemoryManager.getTotalInputRecords(LEFT_INDEX));
-
-    stats.setLongStat(HashJoinBatch.Metric.RIGHT_INPUT_BATCH_COUNT, 
batchMemoryManager.getNumIncomingBatches(RIGHT_INDEX));
-    stats.setLongStat(HashJoinBatch.Metric.RIGHT_AVG_INPUT_BATCH_BYTES, 
batchMemoryManager.getAvgInputBatchSize(RIGHT_INDEX));
-    stats.setLongStat(HashJoinBatch.Metric.RIGHT_AVG_INPUT_ROW_BYTES, 
batchMemoryManager.getAvgInputRowWidth(RIGHT_INDEX));
-    stats.setLongStat(HashJoinBatch.Metric.RIGHT_INPUT_RECORD_COUNT, 
batchMemoryManager.getTotalInputRecords(RIGHT_INDEX));
-
-    stats.setLongStat(HashJoinBatch.Metric.OUTPUT_BATCH_COUNT, 
batchMemoryManager.getNumOutgoingBatches());
-    stats.setLongStat(HashJoinBatch.Metric.AVG_OUTPUT_BATCH_BYTES, 
batchMemoryManager.getAvgOutputBatchSize());
-    stats.setLongStat(HashJoinBatch.Metric.AVG_OUTPUT_ROW_BYTES, 
batchMemoryManager.getAvgOutputRowWidth());
-    stats.setLongStat(HashJoinBatch.Metric.OUTPUT_RECORD_COUNT, 
batchMemoryManager.getTotalOutputRecords());
+    stats.setLongStat(HashJoinBatch.Metric.LEFT_INPUT_BATCH_COUNT,
+        batchMemoryManager.getNumIncomingBatches(LEFT_INDEX));
+    stats.setLongStat(HashJoinBatch.Metric.LEFT_AVG_INPUT_BATCH_BYTES,
+        batchMemoryManager.getAvgInputBatchSize(LEFT_INDEX));
+    stats.setLongStat(HashJoinBatch.Metric.LEFT_AVG_INPUT_ROW_BYTES,
+        batchMemoryManager.getAvgInputRowWidth(LEFT_INDEX));
+    stats.setLongStat(HashJoinBatch.Metric.LEFT_INPUT_RECORD_COUNT,
+        batchMemoryManager.getTotalInputRecords(LEFT_INDEX));
+
+    stats.setLongStat(HashJoinBatch.Metric.RIGHT_INPUT_BATCH_COUNT,
+        batchMemoryManager.getNumIncomingBatches(RIGHT_INDEX));
+    stats.setLongStat(HashJoinBatch.Metric.RIGHT_AVG_INPUT_BATCH_BYTES,
+        batchMemoryManager.getAvgInputBatchSize(RIGHT_INDEX));
+    stats.setLongStat(HashJoinBatch.Metric.RIGHT_AVG_INPUT_ROW_BYTES,
+        batchMemoryManager.getAvgInputRowWidth(RIGHT_INDEX));
+    stats.setLongStat(HashJoinBatch.Metric.RIGHT_INPUT_RECORD_COUNT,
+        batchMemoryManager.getTotalInputRecords(RIGHT_INDEX));
+
+    stats.setLongStat(HashJoinBatch.Metric.OUTPUT_BATCH_COUNT,
+        batchMemoryManager.getNumOutgoingBatches());
+    stats.setLongStat(HashJoinBatch.Metric.AVG_OUTPUT_BATCH_BYTES,
+        batchMemoryManager.getAvgOutputBatchSize());
+    stats.setLongStat(HashJoinBatch.Metric.AVG_OUTPUT_ROW_BYTES,
+        batchMemoryManager.getAvgOutputRowWidth());
+    stats.setLongStat(HashJoinBatch.Metric.OUTPUT_RECORD_COUNT,
+        batchMemoryManager.getTotalOutputRecords());
 
 Review comment:
   With little trick may be : 
   
   ```java
     public void updateMetrics() {
       RecordBatchMemoryManager bmm = batchMemoryManager;
       stats.setLongStat(Metric.LEFT_INPUT_BATCH_COUNT, 
bmm.getNumIncomingBatches(LEFT_INDEX));
       stats.setLongStat(Metric.LEFT_AVG_INPUT_BATCH_BYTES, 
bmm.getAvgInputBatchSize(LEFT_INDEX));
       stats.setLongStat(Metric.LEFT_AVG_INPUT_ROW_BYTES, 
bmm.getAvgInputRowWidth(LEFT_INDEX));
       stats.setLongStat(Metric.LEFT_INPUT_RECORD_COUNT, 
bmm.getTotalInputRecords(LEFT_INDEX));
       
       stats.setLongStat(Metric.RIGHT_INPUT_BATCH_COUNT, 
bmm.getNumIncomingBatches(RIGHT_INDEX));
       stats.setLongStat(Metric.RIGHT_AVG_INPUT_BATCH_BYTES, 
bmm.getAvgInputBatchSize(RIGHT_INDEX));
       stats.setLongStat(Metric.RIGHT_AVG_INPUT_ROW_BYTES, 
bmm.getAvgInputRowWidth(RIGHT_INDEX));
       stats.setLongStat(Metric.RIGHT_INPUT_RECORD_COUNT, 
bmm.getTotalInputRecords(RIGHT_INDEX));
       
       stats.setLongStat(Metric.OUTPUT_BATCH_COUNT, 
bmm.getNumOutgoingBatches());
       stats.setLongStat(Metric.AVG_OUTPUT_BATCH_BYTES, 
bmm.getAvgOutputBatchSize());
       stats.setLongStat(Metric.AVG_OUTPUT_ROW_BYTES, 
bmm.getAvgOutputRowWidth());
       stats.setLongStat(Metric.OUTPUT_RECORD_COUNT, 
bmm.getTotalOutputRecords());
     }
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to