[ 
https://issues.apache.org/jira/browse/DRILL-6411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476714#comment-16476714
 ] 

ASF GitHub Bot commented on DRILL-6411:
---------------------------------------

Ben-Zvi closed pull request #1260: DRILL-6411: Make batch memory sizing logs 
uniform across all operators
URL: https://github.com/apache/drill/pull/1260
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
index d57246d4ce..be44c94c0a 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
@@ -157,10 +157,9 @@ public void update() {
       // i.e. all rows fit within memory budget.
       setOutputRowCount(Math.min(columnSize.getElementCount(), 
getOutputRowCount()));
 
-      logger.debug("incoming batch size : {}", getRecordBatchSizer());
-
-      logger.debug("output batch size : {}, avg outgoing rowWidth : {}, output 
rowCount : {}",
-        outputBatchSize, avgOutgoingRowWidth, getOutputRowCount());
+      if (logger.isDebugEnabled()) {
+        logger.debug("BATCH_STATS, incoming:\n {}", getRecordBatchSizer());
+      }
 
       updateIncomingStats();
     }
@@ -263,6 +262,10 @@ protected IterOutcome doWork() {
 
     flattenMemoryManager.updateOutgoingStats(outputRecords);
 
+    if (logger.isDebugEnabled()) {
+      logger.debug("BATCH_STATS, outgoing:\n {}", new RecordBatchSizer(this));
+    }
+
     // Get the final outcome based on hasRemainder since that will determine 
if all the incoming records were
     // consumed in current output batch or not
     return getFinalOutcome(hasRemainder);
@@ -513,11 +516,11 @@ private void updateStats() {
     stats.setLongStat(Metric.AVG_OUTPUT_ROW_BYTES, 
flattenMemoryManager.getAvgOutputRowWidth());
     stats.setLongStat(Metric.OUTPUT_RECORD_COUNT, 
flattenMemoryManager.getTotalOutputRecords());
 
-    logger.debug("input: batch count : {}, avg batch bytes : {},  avg row 
bytes : {}, record count : {}",
+    logger.debug("BATCH_STATS, incoming aggregate: count : {}, avg bytes : {}, 
 avg row bytes : {}, record count : {}",
       flattenMemoryManager.getNumIncomingBatches(), 
flattenMemoryManager.getAvgInputBatchSize(),
       flattenMemoryManager.getAvgInputRowWidth(), 
flattenMemoryManager.getTotalInputRecords());
 
-    logger.debug("output: batch count : {}, avg batch bytes : {},  avg row 
bytes : {}, record count : {}",
+    logger.debug("BATCH_STATS, outgoing aggregate: count : {}, avg bytes : {}, 
 avg row bytes : {}, record count : {}",
       flattenMemoryManager.getNumOutgoingBatches(), 
flattenMemoryManager.getAvgOutputBatchSize(),
       flattenMemoryManager.getAvgOutputRowWidth(), 
flattenMemoryManager.getTotalOutputRecords());
 
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
index 6425b29e09..8ea381bc1c 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
@@ -160,6 +160,23 @@ public IterOutcome innerNext() {
   @Override
   public void close() {
     updateBatchMemoryManagerStats();
+
+    logger.debug("BATCH_STATS, incoming aggregate left: batch count : {}, avg 
bytes : {},  avg row bytes : {}, record count : {}",
+      
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX),
+      
batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
+      
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX),
+      
batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));
+
+    logger.debug("BATCH_STATS, incoming aggregate right: batch count : {}, avg 
bytes : {},  avg row bytes : {}, record count : {}",
+      
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX),
+      
batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
+      
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX),
+      
batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));
+
+    logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg bytes 
: {},  avg row bytes : {}, record count : {}",
+      batchMemoryManager.getNumOutgoingBatches(), 
batchMemoryManager.getAvgOutputBatchSize(),
+      batchMemoryManager.getAvgOutputRowWidth(), 
batchMemoryManager.getTotalOutputRecords());
+
     super.close();
   }
 
@@ -620,7 +637,10 @@ private void finalizeOutputContainer() {
     container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
 
     batchMemoryManager.updateOutgoingStats(outputIndex);
-    logger.debug("Number of records emitted: " + outputIndex);
+    if (logger.isDebugEnabled()) {
+      logger.debug("BATCH_STATS, outgoing:\n {}", new RecordBatchSizer(this));
+      logger.debug("Number of records emitted: " + outputIndex);
+    }
 
     // Update the output index for next output batch to zero
     outputIndex = 0;
@@ -854,6 +874,10 @@ private void updateMemoryManager(int inputIndex) {
     // a new output batch with new incoming then it will not cause any problem 
since outputIndex will be 0
     final int newOutputRowCount = batchMemoryManager.update(inputIndex, 
outputIndex);
 
+    if (logger.isDebugEnabled()) {
+      logger.debug("BATCH_STATS, incoming {}:\n {}", inputIndex == 0 ? "left" 
: "right", batchMemoryManager.getRecordBatchSizer(inputIndex));
+    }
+
     if (useMemoryManager) {
       maxOutputRowCount = newOutputRowCount;
     }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index ffcbae3780..9713b70d29 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -122,6 +122,9 @@
     @Override
     public void update(int inputIndex) {
       status.setTargetOutputRowCount(super.update(inputIndex, 
status.getOutPosition()));
+      if (logger.isDebugEnabled()) {
+        logger.debug("BATCH_STATS, incoming {}:\n {}", inputIndex == 0 ? 
"left" : "right", getRecordBatchSizer(inputIndex));
+      }
     }
   }
 
@@ -266,12 +269,34 @@ private void setRecordCountInContainer() {
       Preconditions.checkArgument(!vw.isHyper());
       vw.getValueVector().getMutator().setValueCount(getRecordCount());
     }
+
+    if (logger.isDebugEnabled()) {
+      logger.debug("BATCH_STATS, outgoing:\n {}", new RecordBatchSizer(this));
+    }
+
     batchMemoryManager.updateOutgoingStats(getRecordCount());
   }
 
   @Override
   public void close() {
     updateBatchMemoryManagerStats();
+
+    logger.debug("BATCH_STATS, incoming aggregate left: batch count : {}, avg 
bytes : {},  avg row bytes : {}, record count : {}",
+      
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX),
+      
batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
+      
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX),
+      
batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));
+
+    logger.debug("BATCH_STATS, incoming aggregate right: batch count : {}, avg 
bytes : {},  avg row bytes : {}, record count : {}",
+      
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX),
+      
batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
+      
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX),
+      
batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));
+
+    logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg bytes 
: {},  avg row bytes : {}, record count : {}",
+      batchMemoryManager.getNumOutgoingBatches(), 
batchMemoryManager.getAvgOutputBatchSize(),
+      batchMemoryManager.getAvgOutputRowWidth(), 
batchMemoryManager.getTotalOutputRecords());
+
     super.close();
     leftIterator.close();
     rightIterator.close();
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
index 668a8973f9..c1761c2bd7 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
@@ -130,10 +130,9 @@ public void update() {
       // i.e. all rows fit within memory budget.
       setOutputRowCount(Math.min(columnSize.getElementCount(), 
getOutputRowCount()));
 
-      logger.debug("incoming batch size : {}", getRecordBatchSizer());
-
-      logger.debug("output batch size : {}, avg outgoing rowWidth : {}, output 
rowCount : {}",
-          outputBatchSize, avgOutgoingRowWidth, getOutputRowCount());
+      if (logger.isDebugEnabled()) {
+        logger.debug("BATCH_STATS, incoming:\n {}", getRecordBatchSizer());
+      }
 
       updateIncomingStats();
     }
@@ -308,6 +307,9 @@ protected IterOutcome doWork() {
     // entire incoming recods has been unnested. If the entire records has been
     // unnested, we return EMIT and any blocking operators in the pipeline will
     // unblock.
+    if (logger.isDebugEnabled()) {
+      logger.debug("BATCH_STATS, outgoing:\n {}", new RecordBatchSizer(this));
+    }
     return hasRemainder ? IterOutcome.OK : IterOutcome.EMIT;
   }
 
@@ -409,11 +411,11 @@ private void updateStats() {
     stats.setLongStat(Metric.AVG_OUTPUT_ROW_BYTES, 
memoryManager.getAvgOutputRowWidth());
     stats.setLongStat(Metric.OUTPUT_RECORD_COUNT, 
memoryManager.getTotalOutputRecords());
 
-    logger.debug("input: batch count : {}, avg batch bytes : {},  avg row 
bytes : {}, record count : {}",
+    logger.debug("BATCH_STATS, incoming aggregate: batch count : {}, avg batch 
bytes : {},  avg row bytes : {}, record count : {}",
         memoryManager.getNumIncomingBatches(), 
memoryManager.getAvgInputBatchSize(),
         memoryManager.getAvgInputRowWidth(), 
memoryManager.getTotalInputRecords());
 
-    logger.debug("output: batch count : {}, avg batch bytes : {},  avg row 
bytes : {}, record count : {}",
+    logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg batch 
bytes : {},  avg row bytes : {}, record count : {}",
         memoryManager.getNumOutgoingBatches(), 
memoryManager.getAvgOutputBatchSize(),
         memoryManager.getAvgOutputRowWidth(), 
memoryManager.getTotalOutputRecords());
 
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java
index 70be9b5967..6bd3969a99 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java
@@ -125,20 +125,5 @@ protected void updateBatchMemoryManagerStats() {
     stats.setLongStat(JoinBatchMemoryManager.Metric.OUTPUT_RECORD_COUNT,
       batchMemoryManager.getTotalOutputRecords());
 
-    logger.debug("left input: batch count : {}, avg batch bytes : {},  avg row 
bytes : {}, record count : {}",
-      
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX),
-      
batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
-      
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX),
-      
batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));
-
-    logger.debug("right input: batch count : {}, avg batch bytes : {},  avg 
row bytes : {}, record count : {}",
-      
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX),
-      
batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
-      
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX),
-      
batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));
-
-    logger.debug("output: batch count : {}, avg batch bytes : {},  avg row 
bytes : {}, record count : {}",
-      batchMemoryManager.getNumOutgoingBatches(), 
batchMemoryManager.getAvgOutputBatchSize(),
-      batchMemoryManager.getAvgOutputRowWidth(), 
batchMemoryManager.getTotalOutputRecords());
   }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java
index fbf8bb444a..c162bbfc5f 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java
@@ -48,12 +48,10 @@ public int update(int inputIndex, int outputPosition) {
       case LEFT_INDEX:
         setRecordBatchSizer(inputIndex, new RecordBatchSizer(leftIncoming));
         leftRowWidth = getRecordBatchSizer(inputIndex).getRowAllocSize();
-        logger.debug("left incoming batch size : {}", 
getRecordBatchSizer(inputIndex));
         break;
       case RIGHT_INDEX:
         setRecordBatchSizer(inputIndex, new RecordBatchSizer(rightIncoming));
         rightRowWidth = getRecordBatchSizer(inputIndex).getRowAllocSize();
-        logger.debug("right incoming batch size : {}", 
getRecordBatchSizer(inputIndex));
       default:
         break;
     }
@@ -85,9 +83,6 @@ public int update(int inputIndex, int outputPosition) {
     // set the new row width
     setOutgoingRowWidth(newOutgoingRowWidth);
 
-    logger.debug("output batch size : {}, avg outgoing rowWidth : {}, output 
rowCount : {}",
-      getOutputBatchSize(), getOutgoingRowWidth(), getOutputRowCount());
-
     return adjustOutputRowCount(outputPosition + numOutputRowsRemaining);
   }
 
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java
index d4da1718b4..d30f565903 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java
@@ -452,7 +452,7 @@ public String toString() {
            .append(", per-array: ")
            .append(cardinality);
       }
-      buf.append("Per entry: std data size: ")
+      buf.append(", Per entry: std data size: ")
          .append(getStdDataSizePerEntry())
          .append(", std net size: ")
          .append(getStdNetSizePerEntry())
@@ -758,12 +758,8 @@ public static int safeDivide(int num, float denom) {
   @Override
   public String toString() {
     StringBuilder buf = new StringBuilder();
-    buf.append("Actual batch schema & sizes {\n");
-    for (ColumnSize colSize : columnSizes.values()) {
-      buf.append("  ");
-      buf.append(colSize.toString());
-      buf.append("\n");
-    }
+
+    buf.append("Batch size: {");
     buf.append( "  Records: " );
     buf.append(rowCount);
     buf.append(", Total size: ");
@@ -776,7 +772,15 @@ public String toString() {
     buf.append(netRowWidth);
     buf.append(", Density: ");
     buf.append(avgDensity);
-    buf.append("%}");
+    buf.append("% }\n");
+
+    buf.append("Batch schema & sizes: {\n");
+    for (ColumnSize colSize : columnSizes.values()) {
+      buf.append("  ");
+      buf.append(colSize.toString());
+      buf.append(" }\n");
+    }
+    buf.append(" }\n");
     return buf.toString();
   }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Make batch memory sizing logs uniform across all operators
> ----------------------------------------------------------
>
>                 Key: DRILL-6411
>                 URL: https://issues.apache.org/jira/browse/DRILL-6411
>             Project: Apache Drill
>          Issue Type: Bug
>    Affects Versions: 1.13.0
>            Reporter: Padma Penumarthy
>            Assignee: Padma Penumarthy
>            Priority: Major
>             Fix For: 1.14.0
>
>
> Make batch memory sizing logs uniform across all operators so QA can parse 
> and verify easily. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to