ihuzenko commented on a change in pull request #1981: DRILL-7583: Remove STOP status from operator outcome URL: https://github.com/apache/drill/pull/1981#discussion_r379477848
########## File path: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java ########## @@ -1344,74 +1483,95 @@ private void updateStats() { stats.setLongStat(Metric.NUM_RESIZING, htStats.numResizing); stats.setLongStat(Metric.RESIZING_TIME_MS, htStats.resizingTime); stats.setLongStat(Metric.NUM_PARTITIONS, numPartitions); - stats.setLongStat(Metric.SPILL_CYCLE, spilledState.getCycle()); // Put 0 in case no spill + stats.setLongStat(Metric.SPILL_CYCLE, spilledState.getCycle()); // Put 0 in + // case no + // spill stats.setLongStat(Metric.SPILLED_PARTITIONS, numSpilled); } /** - * Get the hash table iterator that is created for the build side of the hash join if - * this hash join was instantiated as a row-key join. - * @return hash table iterator or null if this hash join was not a row-key join or if it - * was a row-key join but the build has not yet completed. + * Get the hash table iterator that is created for the build side of the hash + * join if this hash join was instantiated as a row-key join. + * + * @return hash table iterator or null if this hash join was not a row-key + * join or if it was a row-key join but the build has not yet + * completed. */ @Override public Pair<ValueVector, Integer> nextRowKeyBatch() { if (buildComplete) { - // partition 0 because Row Key Join has only a single partition - no spilling + // partition 0 because Row Key Join has only a single partition - no + // spilling Pair<VectorContainer, Integer> pp = partitions[0].nextBatch(); if (pp != null) { VectorWrapper<?> vw = Iterables.get(pp.getLeft(), 0); ValueVector vv = vw.getValueVector(); return Pair.of(vv, pp.getRight()); } - } else if(partitions == null && firstOutputBatch) { //if there is data coming to right(build) side in build Schema stage, use it. + } else if (partitions == null && firstOutputBatch) { // if there is data + // coming to + // right(build) side in + // build Schema stage, + // use it. firstOutputBatch = false; - if ( right.getRecordCount() > 0 ) { + if (right.getRecordCount() > 0) { VectorWrapper<?> vw = Iterables.get(right, 0); ValueVector vv = vw.getValueVector(); - return Pair.of(vv, right.getRecordCount()-1); + return Pair.of(vv, right.getRecordCount() - 1); } } return null; } - @Override // implement RowKeyJoin interface + @Override // implement RowKeyJoin interface public boolean hasRowKeyBatch() { return buildComplete; } - @Override // implement RowKeyJoin interface + @Override // implement RowKeyJoin interface public BatchState getBatchState() { return state; } - @Override // implement RowKeyJoin interface + @Override // implement RowKeyJoin interface public void setBatchState(BatchState newState) { state = newState; } @Override - public void killIncoming(boolean sendUpstream) { + protected void cancelIncoming() { wasKilled = true; - probeBatch.kill(sendUpstream); - buildBatch.kill(sendUpstream); + probeBatch.cancel(); + buildBatch.cancel(); } public void updateMetrics() { - stats.setLongStat(HashJoinBatch.Metric.LEFT_INPUT_BATCH_COUNT, batchMemoryManager.getNumIncomingBatches(LEFT_INDEX)); - stats.setLongStat(HashJoinBatch.Metric.LEFT_AVG_INPUT_BATCH_BYTES, batchMemoryManager.getAvgInputBatchSize(LEFT_INDEX)); - stats.setLongStat(HashJoinBatch.Metric.LEFT_AVG_INPUT_ROW_BYTES, batchMemoryManager.getAvgInputRowWidth(LEFT_INDEX)); - stats.setLongStat(HashJoinBatch.Metric.LEFT_INPUT_RECORD_COUNT, batchMemoryManager.getTotalInputRecords(LEFT_INDEX)); - - stats.setLongStat(HashJoinBatch.Metric.RIGHT_INPUT_BATCH_COUNT, batchMemoryManager.getNumIncomingBatches(RIGHT_INDEX)); - stats.setLongStat(HashJoinBatch.Metric.RIGHT_AVG_INPUT_BATCH_BYTES, batchMemoryManager.getAvgInputBatchSize(RIGHT_INDEX)); - stats.setLongStat(HashJoinBatch.Metric.RIGHT_AVG_INPUT_ROW_BYTES, batchMemoryManager.getAvgInputRowWidth(RIGHT_INDEX)); - stats.setLongStat(HashJoinBatch.Metric.RIGHT_INPUT_RECORD_COUNT, batchMemoryManager.getTotalInputRecords(RIGHT_INDEX)); - - stats.setLongStat(HashJoinBatch.Metric.OUTPUT_BATCH_COUNT, batchMemoryManager.getNumOutgoingBatches()); - stats.setLongStat(HashJoinBatch.Metric.AVG_OUTPUT_BATCH_BYTES, batchMemoryManager.getAvgOutputBatchSize()); - stats.setLongStat(HashJoinBatch.Metric.AVG_OUTPUT_ROW_BYTES, batchMemoryManager.getAvgOutputRowWidth()); - stats.setLongStat(HashJoinBatch.Metric.OUTPUT_RECORD_COUNT, batchMemoryManager.getTotalOutputRecords()); + stats.setLongStat(HashJoinBatch.Metric.LEFT_INPUT_BATCH_COUNT, + batchMemoryManager.getNumIncomingBatches(LEFT_INDEX)); + stats.setLongStat(HashJoinBatch.Metric.LEFT_AVG_INPUT_BATCH_BYTES, + batchMemoryManager.getAvgInputBatchSize(LEFT_INDEX)); + stats.setLongStat(HashJoinBatch.Metric.LEFT_AVG_INPUT_ROW_BYTES, + batchMemoryManager.getAvgInputRowWidth(LEFT_INDEX)); + stats.setLongStat(HashJoinBatch.Metric.LEFT_INPUT_RECORD_COUNT, + batchMemoryManager.getTotalInputRecords(LEFT_INDEX)); + + stats.setLongStat(HashJoinBatch.Metric.RIGHT_INPUT_BATCH_COUNT, + batchMemoryManager.getNumIncomingBatches(RIGHT_INDEX)); + stats.setLongStat(HashJoinBatch.Metric.RIGHT_AVG_INPUT_BATCH_BYTES, + batchMemoryManager.getAvgInputBatchSize(RIGHT_INDEX)); + stats.setLongStat(HashJoinBatch.Metric.RIGHT_AVG_INPUT_ROW_BYTES, + batchMemoryManager.getAvgInputRowWidth(RIGHT_INDEX)); + stats.setLongStat(HashJoinBatch.Metric.RIGHT_INPUT_RECORD_COUNT, + batchMemoryManager.getTotalInputRecords(RIGHT_INDEX)); + + stats.setLongStat(HashJoinBatch.Metric.OUTPUT_BATCH_COUNT, + batchMemoryManager.getNumOutgoingBatches()); + stats.setLongStat(HashJoinBatch.Metric.AVG_OUTPUT_BATCH_BYTES, + batchMemoryManager.getAvgOutputBatchSize()); + stats.setLongStat(HashJoinBatch.Metric.AVG_OUTPUT_ROW_BYTES, + batchMemoryManager.getAvgOutputRowWidth()); + stats.setLongStat(HashJoinBatch.Metric.OUTPUT_RECORD_COUNT, + batchMemoryManager.getTotalOutputRecords()); Review comment: With little trick may be : ```java public void updateMetrics() { RecordBatchMemoryManager bmm = batchMemoryManager; stats.setLongStat(Metric.LEFT_INPUT_BATCH_COUNT, bmm.getNumIncomingBatches(LEFT_INDEX)); stats.setLongStat(Metric.LEFT_AVG_INPUT_BATCH_BYTES, bmm.getAvgInputBatchSize(LEFT_INDEX)); stats.setLongStat(Metric.LEFT_AVG_INPUT_ROW_BYTES, bmm.getAvgInputRowWidth(LEFT_INDEX)); stats.setLongStat(Metric.LEFT_INPUT_RECORD_COUNT, bmm.getTotalInputRecords(LEFT_INDEX)); stats.setLongStat(Metric.RIGHT_INPUT_BATCH_COUNT, bmm.getNumIncomingBatches(RIGHT_INDEX)); stats.setLongStat(Metric.RIGHT_AVG_INPUT_BATCH_BYTES, bmm.getAvgInputBatchSize(RIGHT_INDEX)); stats.setLongStat(Metric.RIGHT_AVG_INPUT_ROW_BYTES, bmm.getAvgInputRowWidth(RIGHT_INDEX)); stats.setLongStat(Metric.RIGHT_INPUT_RECORD_COUNT, bmm.getTotalInputRecords(RIGHT_INDEX)); stats.setLongStat(Metric.OUTPUT_BATCH_COUNT, bmm.getNumOutgoingBatches()); stats.setLongStat(Metric.AVG_OUTPUT_BATCH_BYTES, bmm.getAvgOutputBatchSize()); stats.setLongStat(Metric.AVG_OUTPUT_ROW_BYTES, bmm.getAvgOutputRowWidth()); stats.setLongStat(Metric.OUTPUT_RECORD_COUNT, bmm.getTotalOutputRecords()); } ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services