[
https://issues.apache.org/jira/browse/DRILL-7583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17037562#comment-17037562
]
ASF GitHub Bot commented on DRILL-7583:
---------------------------------------
paul-rogers commented on pull request #1981: DRILL-7583: Remove STOP status
from operator outcome
URL: https://github.com/apache/drill/pull/1981#discussion_r379849068
##########
File path:
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
##########
@@ -1344,74 +1483,95 @@ private void updateStats() {
stats.setLongStat(Metric.NUM_RESIZING, htStats.numResizing);
stats.setLongStat(Metric.RESIZING_TIME_MS, htStats.resizingTime);
stats.setLongStat(Metric.NUM_PARTITIONS, numPartitions);
- stats.setLongStat(Metric.SPILL_CYCLE, spilledState.getCycle()); // Put 0
in case no spill
+ stats.setLongStat(Metric.SPILL_CYCLE, spilledState.getCycle()); // Put 0 in
+ // case no
+ // spill
stats.setLongStat(Metric.SPILLED_PARTITIONS, numSpilled);
}
/**
- * Get the hash table iterator that is created for the build side of the
hash join if
- * this hash join was instantiated as a row-key join.
- * @return hash table iterator or null if this hash join was not a row-key
join or if it
- * was a row-key join but the build has not yet completed.
+ * Get the hash table iterator that is created for the build side of the hash
+ * join if this hash join was instantiated as a row-key join.
+ *
+ * @return hash table iterator or null if this hash join was not a row-key
+ * join or if it was a row-key join but the build has not yet
+ * completed.
*/
@Override
public Pair<ValueVector, Integer> nextRowKeyBatch() {
if (buildComplete) {
- // partition 0 because Row Key Join has only a single partition - no
spilling
+ // partition 0 because Row Key Join has only a single partition - no
+ // spilling
Pair<VectorContainer, Integer> pp = partitions[0].nextBatch();
if (pp != null) {
VectorWrapper<?> vw = Iterables.get(pp.getLeft(), 0);
ValueVector vv = vw.getValueVector();
return Pair.of(vv, pp.getRight());
}
- } else if(partitions == null && firstOutputBatch) { //if there is data
coming to right(build) side in build Schema stage, use it.
+ } else if (partitions == null && firstOutputBatch) { // if there is data
+ // coming to
+ // right(build) side
in
+ // build Schema stage,
+ // use it.
firstOutputBatch = false;
- if ( right.getRecordCount() > 0 ) {
+ if (right.getRecordCount() > 0) {
VectorWrapper<?> vw = Iterables.get(right, 0);
ValueVector vv = vw.getValueVector();
- return Pair.of(vv, right.getRecordCount()-1);
+ return Pair.of(vv, right.getRecordCount() - 1);
}
}
return null;
}
- @Override // implement RowKeyJoin interface
+ @Override // implement RowKeyJoin interface
public boolean hasRowKeyBatch() {
return buildComplete;
}
- @Override // implement RowKeyJoin interface
+ @Override // implement RowKeyJoin interface
public BatchState getBatchState() {
return state;
}
- @Override // implement RowKeyJoin interface
+ @Override // implement RowKeyJoin interface
public void setBatchState(BatchState newState) {
state = newState;
}
@Override
- public void killIncoming(boolean sendUpstream) {
+ protected void cancelIncoming() {
wasKilled = true;
- probeBatch.kill(sendUpstream);
- buildBatch.kill(sendUpstream);
+ probeBatch.cancel();
+ buildBatch.cancel();
}
public void updateMetrics() {
- stats.setLongStat(HashJoinBatch.Metric.LEFT_INPUT_BATCH_COUNT,
batchMemoryManager.getNumIncomingBatches(LEFT_INDEX));
- stats.setLongStat(HashJoinBatch.Metric.LEFT_AVG_INPUT_BATCH_BYTES,
batchMemoryManager.getAvgInputBatchSize(LEFT_INDEX));
- stats.setLongStat(HashJoinBatch.Metric.LEFT_AVG_INPUT_ROW_BYTES,
batchMemoryManager.getAvgInputRowWidth(LEFT_INDEX));
- stats.setLongStat(HashJoinBatch.Metric.LEFT_INPUT_RECORD_COUNT,
batchMemoryManager.getTotalInputRecords(LEFT_INDEX));
-
- stats.setLongStat(HashJoinBatch.Metric.RIGHT_INPUT_BATCH_COUNT,
batchMemoryManager.getNumIncomingBatches(RIGHT_INDEX));
- stats.setLongStat(HashJoinBatch.Metric.RIGHT_AVG_INPUT_BATCH_BYTES,
batchMemoryManager.getAvgInputBatchSize(RIGHT_INDEX));
- stats.setLongStat(HashJoinBatch.Metric.RIGHT_AVG_INPUT_ROW_BYTES,
batchMemoryManager.getAvgInputRowWidth(RIGHT_INDEX));
- stats.setLongStat(HashJoinBatch.Metric.RIGHT_INPUT_RECORD_COUNT,
batchMemoryManager.getTotalInputRecords(RIGHT_INDEX));
-
- stats.setLongStat(HashJoinBatch.Metric.OUTPUT_BATCH_COUNT,
batchMemoryManager.getNumOutgoingBatches());
- stats.setLongStat(HashJoinBatch.Metric.AVG_OUTPUT_BATCH_BYTES,
batchMemoryManager.getAvgOutputBatchSize());
- stats.setLongStat(HashJoinBatch.Metric.AVG_OUTPUT_ROW_BYTES,
batchMemoryManager.getAvgOutputRowWidth());
- stats.setLongStat(HashJoinBatch.Metric.OUTPUT_RECORD_COUNT,
batchMemoryManager.getTotalOutputRecords());
+ stats.setLongStat(HashJoinBatch.Metric.LEFT_INPUT_BATCH_COUNT,
+ batchMemoryManager.getNumIncomingBatches(LEFT_INDEX));
+ stats.setLongStat(HashJoinBatch.Metric.LEFT_AVG_INPUT_BATCH_BYTES,
+ batchMemoryManager.getAvgInputBatchSize(LEFT_INDEX));
+ stats.setLongStat(HashJoinBatch.Metric.LEFT_AVG_INPUT_ROW_BYTES,
+ batchMemoryManager.getAvgInputRowWidth(LEFT_INDEX));
+ stats.setLongStat(HashJoinBatch.Metric.LEFT_INPUT_RECORD_COUNT,
+ batchMemoryManager.getTotalInputRecords(LEFT_INDEX));
+
+ stats.setLongStat(HashJoinBatch.Metric.RIGHT_INPUT_BATCH_COUNT,
+ batchMemoryManager.getNumIncomingBatches(RIGHT_INDEX));
+ stats.setLongStat(HashJoinBatch.Metric.RIGHT_AVG_INPUT_BATCH_BYTES,
+ batchMemoryManager.getAvgInputBatchSize(RIGHT_INDEX));
+ stats.setLongStat(HashJoinBatch.Metric.RIGHT_AVG_INPUT_ROW_BYTES,
+ batchMemoryManager.getAvgInputRowWidth(RIGHT_INDEX));
+ stats.setLongStat(HashJoinBatch.Metric.RIGHT_INPUT_RECORD_COUNT,
+ batchMemoryManager.getTotalInputRecords(RIGHT_INDEX));
+
+ stats.setLongStat(HashJoinBatch.Metric.OUTPUT_BATCH_COUNT,
+ batchMemoryManager.getNumOutgoingBatches());
+ stats.setLongStat(HashJoinBatch.Metric.AVG_OUTPUT_BATCH_BYTES,
+ batchMemoryManager.getAvgOutputBatchSize());
+ stats.setLongStat(HashJoinBatch.Metric.AVG_OUTPUT_ROW_BYTES,
+ batchMemoryManager.getAvgOutputRowWidth());
+ stats.setLongStat(HashJoinBatch.Metric.OUTPUT_RECORD_COUNT,
+ batchMemoryManager.getTotalOutputRecords());
Review comment:
Unintended change. Reverted.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Remove STOP status in favor of fail-fast
> ----------------------------------------
>
> Key: DRILL-7583
> URL: https://issues.apache.org/jira/browse/DRILL-7583
> Project: Apache Drill
> Issue Type: Improvement
> Affects Versions: 1.17.0
> Reporter: Paul Rogers
> Assignee: Paul Rogers
> Priority: Major
> Fix For: 1.18.0
>
>
> The original error solution was a complex process of a) setting a failed
> flag, b) telling all upstream operators they have failed, c) returning a
> {{STOP}} status. Drill has long supported a "fail-fast" error path based on
> throwing an exception; relying on the fragment executor to clean up the
> operator stack. Recent revisions have converted most operators to use the
> simpler fail-fast strategy based on throwing an exception instead of using
> the older {{STOP}} approach. This change simply removes the old, now-unused
> {{STOP}} based path.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)