This is an automated email from the ASF dual-hosted git repository. boaz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit 79bb7f9ae22bb981bee6abd83171cd98cd72c0fa Author: Padma Penumarthy <[email protected]> AuthorDate: Mon May 14 15:01:25 2018 -0700 DRILL-6411: Make batch memory sizing logs uniform across all operators --- .../physical/impl/flatten/FlattenRecordBatch.java | 11 +++++------ .../exec/physical/impl/join/LateralJoinBatch.java | 20 ++++++++++++++++++++ .../exec/physical/impl/join/MergeJoinBatch.java | 21 +++++++++++++++++++++ .../physical/impl/unnest/UnnestRecordBatch.java | 5 +++-- .../exec/record/AbstractBinaryRecordBatch.java | 15 --------------- .../drill/exec/record/JoinBatchMemoryManager.java | 5 ----- .../apache/drill/exec/record/RecordBatchSizer.java | 20 ++++++++++++-------- 7 files changed, 61 insertions(+), 36 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java index d57246d..1a56265 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java @@ -157,10 +157,7 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> { // i.e. all rows fit within memory budget. setOutputRowCount(Math.min(columnSize.getElementCount(), getOutputRowCount())); - logger.debug("incoming batch size : {}", getRecordBatchSizer()); - - logger.debug("output batch size : {}, avg outgoing rowWidth : {}, output rowCount : {}", - outputBatchSize, avgOutgoingRowWidth, getOutputRowCount()); + logger.debug("BATCH_STATS, incoming:\n {}", getRecordBatchSizer()); updateIncomingStats(); } @@ -263,6 +260,8 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> { flattenMemoryManager.updateOutgoingStats(outputRecords); + logger.debug("BATCH_STATS, outgoing:\n {}", new RecordBatchSizer(this)); + // Get the final outcome based on hasRemainder since that will determine if all the incoming records were // consumed in current output batch or not return getFinalOutcome(hasRemainder); @@ -513,11 +512,11 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> { stats.setLongStat(Metric.AVG_OUTPUT_ROW_BYTES, flattenMemoryManager.getAvgOutputRowWidth()); stats.setLongStat(Metric.OUTPUT_RECORD_COUNT, flattenMemoryManager.getTotalOutputRecords()); - logger.debug("input: batch count : {}, avg batch bytes : {}, avg row bytes : {}, record count : {}", + logger.debug("BATCH_STATS, incoming aggregate: count : {}, avg bytes : {}, avg row bytes : {}, record count : {}", flattenMemoryManager.getNumIncomingBatches(), flattenMemoryManager.getAvgInputBatchSize(), flattenMemoryManager.getAvgInputRowWidth(), flattenMemoryManager.getTotalInputRecords()); - logger.debug("output: batch count : {}, avg batch bytes : {}, avg row bytes : {}, record count : {}", + logger.debug("BATCH_STATS, outgoing aggregate: count : {}, avg bytes : {}, avg row bytes : {}, record count : {}", flattenMemoryManager.getNumOutgoingBatches(), flattenMemoryManager.getAvgOutputBatchSize(), flattenMemoryManager.getAvgOutputRowWidth(), flattenMemoryManager.getTotalOutputRecords()); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java index 6425b29..b7e40fe 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java @@ -160,6 +160,23 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> @Override public void close() { updateBatchMemoryManagerStats(); + + logger.debug("BATCH_STATS, incoming aggregate left: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}", + batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX), + batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX), + batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX), + batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX)); + + logger.debug("BATCH_STATS, incoming aggregate right: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}", + batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX), + batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX), + batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX), + batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX)); + + logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}", + batchMemoryManager.getNumOutgoingBatches(), batchMemoryManager.getAvgOutputBatchSize(), + batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords()); + super.close(); } @@ -620,6 +637,7 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> container.buildSchema(BatchSchema.SelectionVectorMode.NONE); batchMemoryManager.updateOutgoingStats(outputIndex); + logger.debug("BATCH_STATS, outgoing:\n {}", new RecordBatchSizer(this)); logger.debug("Number of records emitted: " + outputIndex); // Update the output index for next output batch to zero @@ -854,6 +872,8 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> // a new output batch with new incoming then it will not cause any problem since outputIndex will be 0 final int newOutputRowCount = batchMemoryManager.update(inputIndex, outputIndex); + logger.debug("BATCH_STATS, incoming {}:\n {}", inputIndex == 0 ? "left" : "right", batchMemoryManager.getRecordBatchSizer(inputIndex)); + if (useMemoryManager) { maxOutputRowCount = newOutputRowCount; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java index ffcbae3..2f91b46 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java @@ -122,6 +122,7 @@ public class MergeJoinBatch extends AbstractBinaryRecordBatch<MergeJoinPOP> { @Override public void update(int inputIndex) { status.setTargetOutputRowCount(super.update(inputIndex, status.getOutPosition())); + logger.debug("BATCH_STATS, incoming {}:\n {}", inputIndex == 0 ? "left" : "right", getRecordBatchSizer(inputIndex)); } } @@ -266,12 +267,32 @@ public class MergeJoinBatch extends AbstractBinaryRecordBatch<MergeJoinPOP> { Preconditions.checkArgument(!vw.isHyper()); vw.getValueVector().getMutator().setValueCount(getRecordCount()); } + + logger.debug("BATCH_STATS, outgoing:\n {}", new RecordBatchSizer(this)); + batchMemoryManager.updateOutgoingStats(getRecordCount()); } @Override public void close() { updateBatchMemoryManagerStats(); + + logger.debug("BATCH_STATS, incoming aggregate left: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}", + batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX), + batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX), + batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX), + batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX)); + + logger.debug("BATCH_STATS, incoming aggregate right: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}", + batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX), + batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX), + batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX), + batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX)); + + logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}", + batchMemoryManager.getNumOutgoingBatches(), batchMemoryManager.getAvgOutputBatchSize(), + batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords()); + super.close(); leftIterator.close(); rightIterator.close(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java index 7e6b141..24b4280 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java @@ -301,6 +301,7 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO // entire incoming recods has been unnested. If the entire records has been // unnested, we return EMIT and any blocking operators in the pipeline will // unblock. + logger.debug("BATCH_STATS, outgoing:\n {}", new RecordBatchSizer(this)); return hasRemainder ? IterOutcome.OK : IterOutcome.EMIT; } @@ -410,11 +411,11 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO stats.setLongStat(Metric.AVG_OUTPUT_ROW_BYTES, memoryManager.getAvgOutputRowWidth()); stats.setLongStat(Metric.OUTPUT_RECORD_COUNT, memoryManager.getTotalOutputRecords()); - logger.debug("input: batch count : {}, avg batch bytes : {}, avg row bytes : {}, record count : {}", + logger.debug("BATCH_STATS, incoming aggregate: batch count : {}, avg batch bytes : {}, avg row bytes : {}, record count : {}", memoryManager.getNumIncomingBatches(), memoryManager.getAvgInputBatchSize(), memoryManager.getAvgInputRowWidth(), memoryManager.getTotalInputRecords()); - logger.debug("output: batch count : {}, avg batch bytes : {}, avg row bytes : {}, record count : {}", + logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg batch bytes : {}, avg row bytes : {}, record count : {}", memoryManager.getNumOutgoingBatches(), memoryManager.getAvgOutputBatchSize(), memoryManager.getAvgOutputRowWidth(), memoryManager.getTotalOutputRecords()); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java index 70be9b5..6bd3969 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java @@ -125,20 +125,5 @@ public abstract class AbstractBinaryRecordBatch<T extends PhysicalOperator> exte stats.setLongStat(JoinBatchMemoryManager.Metric.OUTPUT_RECORD_COUNT, batchMemoryManager.getTotalOutputRecords()); - logger.debug("left input: batch count : {}, avg batch bytes : {}, avg row bytes : {}, record count : {}", - batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX), - batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX), - batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX), - batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX)); - - logger.debug("right input: batch count : {}, avg batch bytes : {}, avg row bytes : {}, record count : {}", - batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX), - batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX), - batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX), - batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX)); - - logger.debug("output: batch count : {}, avg batch bytes : {}, avg row bytes : {}, record count : {}", - batchMemoryManager.getNumOutgoingBatches(), batchMemoryManager.getAvgOutputBatchSize(), - batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords()); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java index fbf8bb4..c162bbf 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java @@ -48,12 +48,10 @@ public class JoinBatchMemoryManager extends RecordBatchMemoryManager { case LEFT_INDEX: setRecordBatchSizer(inputIndex, new RecordBatchSizer(leftIncoming)); leftRowWidth = getRecordBatchSizer(inputIndex).getRowAllocSize(); - logger.debug("left incoming batch size : {}", getRecordBatchSizer(inputIndex)); break; case RIGHT_INDEX: setRecordBatchSizer(inputIndex, new RecordBatchSizer(rightIncoming)); rightRowWidth = getRecordBatchSizer(inputIndex).getRowAllocSize(); - logger.debug("right incoming batch size : {}", getRecordBatchSizer(inputIndex)); default: break; } @@ -85,9 +83,6 @@ public class JoinBatchMemoryManager extends RecordBatchMemoryManager { // set the new row width setOutgoingRowWidth(newOutgoingRowWidth); - logger.debug("output batch size : {}, avg outgoing rowWidth : {}, output rowCount : {}", - getOutputBatchSize(), getOutgoingRowWidth(), getOutputRowCount()); - return adjustOutputRowCount(outputPosition + numOutputRowsRemaining); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java index d4da171..d30f565 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java @@ -452,7 +452,7 @@ public class RecordBatchSizer { .append(", per-array: ") .append(cardinality); } - buf.append("Per entry: std data size: ") + buf.append(", Per entry: std data size: ") .append(getStdDataSizePerEntry()) .append(", std net size: ") .append(getStdNetSizePerEntry()) @@ -758,12 +758,8 @@ public class RecordBatchSizer { @Override public String toString() { StringBuilder buf = new StringBuilder(); - buf.append("Actual batch schema & sizes {\n"); - for (ColumnSize colSize : columnSizes.values()) { - buf.append(" "); - buf.append(colSize.toString()); - buf.append("\n"); - } + + buf.append("Batch size: {"); buf.append( " Records: " ); buf.append(rowCount); buf.append(", Total size: "); @@ -776,7 +772,15 @@ public class RecordBatchSizer { buf.append(netRowWidth); buf.append(", Density: "); buf.append(avgDensity); - buf.append("%}"); + buf.append("% }\n"); + + buf.append("Batch schema & sizes: {\n"); + for (ColumnSize colSize : columnSizes.values()) { + buf.append(" "); + buf.append(colSize.toString()); + buf.append(" }\n"); + } + buf.append(" }\n"); return buf.toString(); } -- To stop receiving notification emails like this one, please contact [email protected].
