[
https://issues.apache.org/jira/browse/DRILL-6478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16509231#comment-16509231
]
ASF GitHub Bot commented on DRILL-6478:
---------------------------------------
ilooner closed pull request #1310: DRILL-6478: enhance debug logs for batch
sizing
URL: https://github.com/apache/drill/pull/1310
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
index be44c94c0a..2f92d52725 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
@@ -157,9 +157,7 @@ public void update() {
// i.e. all rows fit within memory budget.
setOutputRowCount(Math.min(columnSize.getElementCount(),
getOutputRowCount()));
- if (logger.isDebugEnabled()) {
- logger.debug("BATCH_STATS, incoming:\n {}", getRecordBatchSizer());
- }
+ logger.debug("BATCH_STATS, incoming: {}", getRecordBatchSizer());
updateIncomingStats();
}
@@ -171,6 +169,8 @@ public FlattenRecordBatch(FlattenPOP pop, RecordBatch
incoming, FragmentContext
// get the output batch size from config.
int configuredBatchSize = (int)
context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
flattenMemoryManager = new FlattenMemoryManager(configuredBatchSize);
+
+ logger.debug("BATCH_STATS, configured output batch size: {}",
configuredBatchSize);
}
@Override
@@ -263,7 +263,7 @@ protected IterOutcome doWork() {
flattenMemoryManager.updateOutgoingStats(outputRecords);
if (logger.isDebugEnabled()) {
- logger.debug("BATCH_STATS, outgoing:\n {}", new RecordBatchSizer(this));
+ logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(this));
}
// Get the final outcome based on hasRemainder since that will determine
if all the incoming records were
@@ -516,14 +516,15 @@ private void updateStats() {
stats.setLongStat(Metric.AVG_OUTPUT_ROW_BYTES,
flattenMemoryManager.getAvgOutputRowWidth());
stats.setLongStat(Metric.OUTPUT_RECORD_COUNT,
flattenMemoryManager.getTotalOutputRecords());
- logger.debug("BATCH_STATS, incoming aggregate: count : {}, avg bytes : {},
avg row bytes : {}, record count : {}",
- flattenMemoryManager.getNumIncomingBatches(),
flattenMemoryManager.getAvgInputBatchSize(),
- flattenMemoryManager.getAvgInputRowWidth(),
flattenMemoryManager.getTotalInputRecords());
-
- logger.debug("BATCH_STATS, outgoing aggregate: count : {}, avg bytes : {},
avg row bytes : {}, record count : {}",
- flattenMemoryManager.getNumOutgoingBatches(),
flattenMemoryManager.getAvgOutputBatchSize(),
- flattenMemoryManager.getAvgOutputRowWidth(),
flattenMemoryManager.getTotalOutputRecords());
+ if (logger.isDebugEnabled()) {
+ logger.debug("BATCH_STATS, incoming aggregate: count : {}, avg bytes :
{}, avg row bytes : {}, record count : {}",
+ flattenMemoryManager.getNumIncomingBatches(),
flattenMemoryManager.getAvgInputBatchSize(),
+ flattenMemoryManager.getAvgInputRowWidth(),
flattenMemoryManager.getTotalInputRecords());
+ logger.debug("BATCH_STATS, outgoing aggregate: count : {}, avg bytes :
{}, avg row bytes : {}, record count : {}",
+ flattenMemoryManager.getNumOutgoingBatches(),
flattenMemoryManager.getAvgOutputBatchSize(),
+ flattenMemoryManager.getAvgOutputRowWidth(),
flattenMemoryManager.getTotalOutputRecords());
+ }
}
@Override
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 4267077aef..428a47ebf3 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -242,10 +242,8 @@ protected boolean prefetchFirstBatchFromBothSides() {
batchMemoryManager.update(LEFT_INDEX, 0);
batchMemoryManager.update(RIGHT_INDEX, 0, true);
- if (logger.isDebugEnabled()) {
- logger.debug("BATCH_STATS, incoming left:\n {}",
batchMemoryManager.getRecordBatchSizer(LEFT_INDEX));
- logger.debug("BATCH_STATS, incoming right:\n {}",
batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX));
- }
+ logger.debug("BATCH_STATS, incoming left: {}",
batchMemoryManager.getRecordBatchSizer(LEFT_INDEX));
+ logger.debug("BATCH_STATS, incoming right: {}",
batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX));
if (leftUpstream == IterOutcome.STOP || rightUpstream == IterOutcome.STOP)
{
state = BatchState.STOP;
@@ -358,7 +356,7 @@ public IterOutcome innerNext() {
batchMemoryManager.updateOutgoingStats(outputRecords);
if (logger.isDebugEnabled()) {
- logger.debug("BATCH_STATS, outgoing:\n {}", new
RecordBatchSizer(this));
+ logger.debug("BATCH_STATS, outgoing: {}", new
RecordBatchSizer(this));
}
/* We are here because of one the following
@@ -890,6 +888,7 @@ public HashJoinBatch(HashJoinPOP popConfig, FragmentContext
context,
// get the output batch size from config.
int configuredBatchSize = (int)
context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
batchMemoryManager = new JoinBatchMemoryManager(configuredBatchSize, left,
right);
+ logger.debug("BATCH_STATS, configured output batch size: {}",
configuredBatchSize);
}
/**
@@ -1004,21 +1003,19 @@ public void close() {
updateMetrics();
- logger.debug("BATCH_STATS, incoming aggregate left: batch count : {}, avg
bytes : {}, avg row bytes : {}, record count : {}",
-
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX),
-
batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
-
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX),
-
batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));
-
- logger.debug("BATCH_STATS, incoming aggregate right: batch count : {}, avg
bytes : {}, avg row bytes : {}, record count : {}",
-
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX),
-
batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
-
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX),
-
batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));
-
- logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg bytes
: {}, avg row bytes : {}, record count : {}",
- batchMemoryManager.getNumOutgoingBatches(),
batchMemoryManager.getAvgOutputBatchSize(),
- batchMemoryManager.getAvgOutputRowWidth(),
batchMemoryManager.getTotalOutputRecords());
+ if (logger.isDebugEnabled()) {
+ logger.debug("BATCH_STATS, incoming aggregate left: batch count : {},
avg bytes : {}, avg row bytes : {}, record count : {}",
+
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX),
batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
+
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX),
batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));
+
+ logger.debug("BATCH_STATS, incoming aggregate right: batch count : {},
avg bytes : {}, avg row bytes : {}, record count : {}",
+
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX),
batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
+
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX),
batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));
+
+ logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg
bytes : {}, avg row bytes : {}, record count : {}",
+ batchMemoryManager.getNumOutgoingBatches(),
batchMemoryManager.getAvgOutputBatchSize(),
+ batchMemoryManager.getAvgOutputRowWidth(),
batchMemoryManager.getTotalOutputRecords());
+ }
this.cleanup();
super.close();
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index a5c2ae7318..62967a9fa1 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -122,9 +122,7 @@
@Override
public void update(int inputIndex) {
status.setTargetOutputRowCount(super.update(inputIndex,
status.getOutPosition()));
- if (logger.isDebugEnabled()) {
- logger.debug("BATCH_STATS, incoming {}:\n {}", inputIndex == 0 ?
"left" : "right", getRecordBatchSizer(inputIndex));
- }
+ logger.debug("BATCH_STATS, incoming {}: {}", inputIndex == 0 ? "left" :
"right", getRecordBatchSizer(inputIndex));
}
}
@@ -132,8 +130,10 @@ protected MergeJoinBatch(MergeJoinPOP popConfig,
FragmentContext context, Record
super(popConfig, context, true, left, right);
// Instantiate the batch memory manager
- final int outputBatchSize = (int)
context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
- batchMemoryManager = new MergeJoinMemoryManager(outputBatchSize, left,
right);
+ final int configuredBatchSize = (int)
context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
+ batchMemoryManager = new MergeJoinMemoryManager(configuredBatchSize, left,
right);
+
+ logger.debug("BATCH_STATS, configured output batch size: {}",
configuredBatchSize);
if (popConfig.getConditions().size() == 0) {
throw new UnsupportedOperationException("Merge Join currently does not
support cartesian join. This join operator was configured with 0 conditions");
@@ -271,7 +271,7 @@ private void setRecordCountInContainer() {
}
if (logger.isDebugEnabled()) {
- logger.debug("BATCH_STATS, outgoing:\n {}", new RecordBatchSizer(this));
+ logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(this));
}
batchMemoryManager.updateOutgoingStats(getRecordCount());
@@ -281,21 +281,23 @@ private void setRecordCountInContainer() {
public void close() {
updateBatchMemoryManagerStats();
- logger.debug("BATCH_STATS, incoming aggregate left: batch count : {}, avg
bytes : {}, avg row bytes : {}, record count : {}",
-
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX),
-
batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
-
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX),
-
batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));
-
- logger.debug("BATCH_STATS, incoming aggregate right: batch count : {}, avg
bytes : {}, avg row bytes : {}, record count : {}",
-
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX),
-
batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
-
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX),
-
batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));
-
- logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg bytes
: {}, avg row bytes : {}, record count : {}",
- batchMemoryManager.getNumOutgoingBatches(),
batchMemoryManager.getAvgOutputBatchSize(),
- batchMemoryManager.getAvgOutputRowWidth(),
batchMemoryManager.getTotalOutputRecords());
+ if (logger.isDebugEnabled()) {
+ logger.debug("BATCH_STATS, incoming aggregate left: batch count : {},
avg bytes : {}, avg row bytes : {}, record count : {}",
+
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX),
+
batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
+
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX),
+
batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));
+
+ logger.debug("BATCH_STATS, incoming aggregate right: batch count : {},
avg bytes : {}, avg row bytes : {}, record count : {}",
+
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX),
+
batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
+
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX),
+
batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));
+
+ logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg
bytes : {}, avg row bytes : {}, record count : {}",
+ batchMemoryManager.getNumOutgoingBatches(),
batchMemoryManager.getAvgOutputBatchSize(),
+ batchMemoryManager.getAvgOutputRowWidth(),
batchMemoryManager.getTotalOutputRecords());
+ }
super.close();
leftIterator.close();
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
index f4c1900a5f..36b9c9ba86 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
@@ -39,9 +39,11 @@
import org.apache.drill.exec.physical.config.UnionAll;
import org.apache.drill.exec.record.AbstractBinaryRecordBatch;
import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.JoinBatchMemoryManager;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.RecordBatchMemoryManager;
+import org.apache.drill.exec.record.RecordBatchSizer;
import org.apache.drill.exec.record.TransferPair;
import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorAccessibleUtilities;
@@ -74,6 +76,7 @@ public UnionAllRecordBatch(UnionAll config, List<RecordBatch>
children, Fragment
// get the output batch size from config.
int configuredBatchSize = (int)
context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
batchMemoryManager = new RecordBatchMemoryManager(numInputs,
configuredBatchSize);
+ logger.debug("BATCH_STATS, configured output batch size: {}",
configuredBatchSize);
}
@Override
@@ -168,6 +171,10 @@ private IterOutcome doWork(BatchStatusWrappper
batchStatus, boolean newSchema) t
batchStatus.recordsProcessed += recordCount;
batchMemoryManager.updateOutgoingStats(recordCount);
+ if (logger.isDebugEnabled()) {
+ logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(this));
+ }
+
if (callBack.getSchemaChangedAndReset()) {
return IterOutcome.OK_NEW_SCHEMA;
} else {
@@ -361,6 +368,8 @@ public boolean hasNext() {
if (topStatus.prefetched) {
topStatus.prefetched = false;
batchMemoryManager.update(topStatus.batch, topStatus.inputIndex);
+ logger.debug("BATCH_STATS, incoming {}: {}", topStatus.inputIndex ==
0 ? "left" : "right",
+ batchMemoryManager.getRecordBatchSizer(topStatus.inputIndex));
return Pair.of(topStatus.outcome, topStatus);
} else {
@@ -378,6 +387,8 @@ public boolean hasNext() {
topStatus.recordsProcessed = 0;
topStatus.totalRecordsToProcess = topStatus.batch.getRecordCount();
batchMemoryManager.update(topStatus.batch, topStatus.inputIndex);
+ logger.debug("BATCH_STATS, incoming {}: {}", topStatus.inputIndex
== 0 ? "left" : "right",
+ batchMemoryManager.getRecordBatchSizer(topStatus.inputIndex));
return Pair.of(outcome, topStatus);
case OUT_OF_MEMORY:
case STOP:
@@ -409,6 +420,20 @@ public void remove() {
public void close() {
super.close();
updateBatchMemoryManagerStats();
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("BATCH_STATS, incoming aggregate left: batch count : {},
avg bytes : {}, avg row bytes : {}, record count : {}",
+
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX),
batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
+
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX),
batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));
+
+ logger.debug("BATCH_STATS, incoming aggregate right: batch count : {},
avg bytes : {}, avg row bytes : {}, record count : {}",
+
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX),
batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
+
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX),
batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));
+
+ logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg
bytes : {}, avg row bytes : {}, record count : {}",
+ batchMemoryManager.getNumOutgoingBatches(),
batchMemoryManager.getAvgOutputBatchSize(),
+ batchMemoryManager.getAvgOutputRowWidth(),
batchMemoryManager.getTotalOutputRecords());
+ }
}
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> enhance debug logs for batch sizing
> -----------------------------------
>
> Key: DRILL-6478
> URL: https://issues.apache.org/jira/browse/DRILL-6478
> Project: Apache Drill
> Issue Type: Bug
> Reporter: Padma Penumarthy
> Assignee: Padma Penumarthy
> Priority: Major
> Labels: ready-to-commit
> Fix For: 1.14.0
>
>
> Fix some issues with debug logs so QA scripts work better. Also, added batch
> sizing logs for union all.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)