This is an automated email from the ASF dual-hosted git repository.

boaz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 79bb7f9ae22bb981bee6abd83171cd98cd72c0fa
Author: Padma Penumarthy <[email protected]>
AuthorDate: Mon May 14 15:01:25 2018 -0700

    DRILL-6411: Make batch memory sizing logs uniform across all operators
---
 .../physical/impl/flatten/FlattenRecordBatch.java   | 11 +++++------
 .../exec/physical/impl/join/LateralJoinBatch.java   | 20 ++++++++++++++++++++
 .../exec/physical/impl/join/MergeJoinBatch.java     | 21 +++++++++++++++++++++
 .../physical/impl/unnest/UnnestRecordBatch.java     |  5 +++--
 .../exec/record/AbstractBinaryRecordBatch.java      | 15 ---------------
 .../drill/exec/record/JoinBatchMemoryManager.java   |  5 -----
 .../apache/drill/exec/record/RecordBatchSizer.java  | 20 ++++++++++++--------
 7 files changed, 61 insertions(+), 36 deletions(-)

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
index d57246d..1a56265 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
@@ -157,10 +157,7 @@ public class FlattenRecordBatch extends 
AbstractSingleRecordBatch<FlattenPOP> {
       // i.e. all rows fit within memory budget.
       setOutputRowCount(Math.min(columnSize.getElementCount(), 
getOutputRowCount()));
 
-      logger.debug("incoming batch size : {}", getRecordBatchSizer());
-
-      logger.debug("output batch size : {}, avg outgoing rowWidth : {}, output 
rowCount : {}",
-        outputBatchSize, avgOutgoingRowWidth, getOutputRowCount());
+      logger.debug("BATCH_STATS, incoming:\n {}", getRecordBatchSizer());
 
       updateIncomingStats();
     }
@@ -263,6 +260,8 @@ public class FlattenRecordBatch extends 
AbstractSingleRecordBatch<FlattenPOP> {
 
     flattenMemoryManager.updateOutgoingStats(outputRecords);
 
+    logger.debug("BATCH_STATS, outgoing:\n {}", new RecordBatchSizer(this));
+
     // Get the final outcome based on hasRemainder since that will determine 
if all the incoming records were
     // consumed in current output batch or not
     return getFinalOutcome(hasRemainder);
@@ -513,11 +512,11 @@ public class FlattenRecordBatch extends 
AbstractSingleRecordBatch<FlattenPOP> {
     stats.setLongStat(Metric.AVG_OUTPUT_ROW_BYTES, 
flattenMemoryManager.getAvgOutputRowWidth());
     stats.setLongStat(Metric.OUTPUT_RECORD_COUNT, 
flattenMemoryManager.getTotalOutputRecords());
 
-    logger.debug("input: batch count : {}, avg batch bytes : {},  avg row 
bytes : {}, record count : {}",
+    logger.debug("BATCH_STATS, incoming aggregate: count : {}, avg bytes : {}, 
 avg row bytes : {}, record count : {}",
       flattenMemoryManager.getNumIncomingBatches(), 
flattenMemoryManager.getAvgInputBatchSize(),
       flattenMemoryManager.getAvgInputRowWidth(), 
flattenMemoryManager.getTotalInputRecords());
 
-    logger.debug("output: batch count : {}, avg batch bytes : {},  avg row 
bytes : {}, record count : {}",
+    logger.debug("BATCH_STATS, outgoing aggregate: count : {}, avg bytes : {}, 
 avg row bytes : {}, record count : {}",
       flattenMemoryManager.getNumOutgoingBatches(), 
flattenMemoryManager.getAvgOutputBatchSize(),
       flattenMemoryManager.getAvgOutputRowWidth(), 
flattenMemoryManager.getTotalOutputRecords());
 
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
index 6425b29..b7e40fe 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
@@ -160,6 +160,23 @@ public class LateralJoinBatch extends 
AbstractBinaryRecordBatch<LateralJoinPOP>
   @Override
   public void close() {
     updateBatchMemoryManagerStats();
+
+    logger.debug("BATCH_STATS, incoming aggregate left: batch count : {}, avg 
bytes : {},  avg row bytes : {}, record count : {}",
+      
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX),
+      
batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
+      
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX),
+      
batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));
+
+    logger.debug("BATCH_STATS, incoming aggregate right: batch count : {}, avg 
bytes : {},  avg row bytes : {}, record count : {}",
+      
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX),
+      
batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
+      
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX),
+      
batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));
+
+    logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg bytes 
: {},  avg row bytes : {}, record count : {}",
+      batchMemoryManager.getNumOutgoingBatches(), 
batchMemoryManager.getAvgOutputBatchSize(),
+      batchMemoryManager.getAvgOutputRowWidth(), 
batchMemoryManager.getTotalOutputRecords());
+
     super.close();
   }
 
@@ -620,6 +637,7 @@ public class LateralJoinBatch extends 
AbstractBinaryRecordBatch<LateralJoinPOP>
     container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
 
     batchMemoryManager.updateOutgoingStats(outputIndex);
+    logger.debug("BATCH_STATS, outgoing:\n {}", new RecordBatchSizer(this));
     logger.debug("Number of records emitted: " + outputIndex);
 
     // Update the output index for next output batch to zero
@@ -854,6 +872,8 @@ public class LateralJoinBatch extends 
AbstractBinaryRecordBatch<LateralJoinPOP>
     // a new output batch with new incoming then it will not cause any problem 
since outputIndex will be 0
     final int newOutputRowCount = batchMemoryManager.update(inputIndex, 
outputIndex);
 
+    logger.debug("BATCH_STATS, incoming {}:\n {}", inputIndex == 0 ? "left" : 
"right", batchMemoryManager.getRecordBatchSizer(inputIndex));
+
     if (useMemoryManager) {
       maxOutputRowCount = newOutputRowCount;
     }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index ffcbae3..2f91b46 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -122,6 +122,7 @@ public class MergeJoinBatch extends 
AbstractBinaryRecordBatch<MergeJoinPOP> {
     @Override
     public void update(int inputIndex) {
       status.setTargetOutputRowCount(super.update(inputIndex, 
status.getOutPosition()));
+      logger.debug("BATCH_STATS, incoming {}:\n {}", inputIndex == 0 ? "left" 
: "right", getRecordBatchSizer(inputIndex));
     }
   }
 
@@ -266,12 +267,32 @@ public class MergeJoinBatch extends 
AbstractBinaryRecordBatch<MergeJoinPOP> {
       Preconditions.checkArgument(!vw.isHyper());
       vw.getValueVector().getMutator().setValueCount(getRecordCount());
     }
+
+    logger.debug("BATCH_STATS, outgoing:\n {}", new RecordBatchSizer(this));
+
     batchMemoryManager.updateOutgoingStats(getRecordCount());
   }
 
   @Override
   public void close() {
     updateBatchMemoryManagerStats();
+
+    logger.debug("BATCH_STATS, incoming aggregate left: batch count : {}, avg 
bytes : {},  avg row bytes : {}, record count : {}",
+      
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX),
+      
batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
+      
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX),
+      
batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));
+
+    logger.debug("BATCH_STATS, incoming aggregate right: batch count : {}, avg 
bytes : {},  avg row bytes : {}, record count : {}",
+      
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX),
+      
batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
+      
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX),
+      
batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));
+
+    logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg bytes 
: {},  avg row bytes : {}, record count : {}",
+      batchMemoryManager.getNumOutgoingBatches(), 
batchMemoryManager.getAvgOutputBatchSize(),
+      batchMemoryManager.getAvgOutputRowWidth(), 
batchMemoryManager.getTotalOutputRecords());
+
     super.close();
     leftIterator.close();
     rightIterator.close();
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
index 7e6b141..24b4280 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
@@ -301,6 +301,7 @@ public class UnnestRecordBatch extends 
AbstractTableFunctionRecordBatch<UnnestPO
     // entire incoming recods has been unnested. If the entire records has been
     // unnested, we return EMIT and any blocking operators in the pipeline will
     // unblock.
+    logger.debug("BATCH_STATS, outgoing:\n {}", new RecordBatchSizer(this));
     return hasRemainder ? IterOutcome.OK : IterOutcome.EMIT;
   }
 
@@ -410,11 +411,11 @@ public class UnnestRecordBatch extends 
AbstractTableFunctionRecordBatch<UnnestPO
     stats.setLongStat(Metric.AVG_OUTPUT_ROW_BYTES, 
memoryManager.getAvgOutputRowWidth());
     stats.setLongStat(Metric.OUTPUT_RECORD_COUNT, 
memoryManager.getTotalOutputRecords());
 
-    logger.debug("input: batch count : {}, avg batch bytes : {},  avg row 
bytes : {}, record count : {}",
+    logger.debug("BATCH_STATS, incoming aggregate: batch count : {}, avg batch 
bytes : {},  avg row bytes : {}, record count : {}",
         memoryManager.getNumIncomingBatches(), 
memoryManager.getAvgInputBatchSize(),
         memoryManager.getAvgInputRowWidth(), 
memoryManager.getTotalInputRecords());
 
-    logger.debug("output: batch count : {}, avg batch bytes : {},  avg row 
bytes : {}, record count : {}",
+    logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg batch 
bytes : {},  avg row bytes : {}, record count : {}",
         memoryManager.getNumOutgoingBatches(), 
memoryManager.getAvgOutputBatchSize(),
         memoryManager.getAvgOutputRowWidth(), 
memoryManager.getTotalOutputRecords());
 
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java
index 70be9b5..6bd3969 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java
@@ -125,20 +125,5 @@ public abstract class AbstractBinaryRecordBatch<T extends 
PhysicalOperator> exte
     stats.setLongStat(JoinBatchMemoryManager.Metric.OUTPUT_RECORD_COUNT,
       batchMemoryManager.getTotalOutputRecords());
 
-    logger.debug("left input: batch count : {}, avg batch bytes : {},  avg row 
bytes : {}, record count : {}",
-      
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX),
-      
batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
-      
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX),
-      
batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));
-
-    logger.debug("right input: batch count : {}, avg batch bytes : {},  avg 
row bytes : {}, record count : {}",
-      
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX),
-      
batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
-      
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX),
-      
batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));
-
-    logger.debug("output: batch count : {}, avg batch bytes : {},  avg row 
bytes : {}, record count : {}",
-      batchMemoryManager.getNumOutgoingBatches(), 
batchMemoryManager.getAvgOutputBatchSize(),
-      batchMemoryManager.getAvgOutputRowWidth(), 
batchMemoryManager.getTotalOutputRecords());
   }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java
index fbf8bb4..c162bbf 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java
@@ -48,12 +48,10 @@ public class JoinBatchMemoryManager extends 
RecordBatchMemoryManager {
       case LEFT_INDEX:
         setRecordBatchSizer(inputIndex, new RecordBatchSizer(leftIncoming));
         leftRowWidth = getRecordBatchSizer(inputIndex).getRowAllocSize();
-        logger.debug("left incoming batch size : {}", 
getRecordBatchSizer(inputIndex));
         break;
       case RIGHT_INDEX:
         setRecordBatchSizer(inputIndex, new RecordBatchSizer(rightIncoming));
         rightRowWidth = getRecordBatchSizer(inputIndex).getRowAllocSize();
-        logger.debug("right incoming batch size : {}", 
getRecordBatchSizer(inputIndex));
       default:
         break;
     }
@@ -85,9 +83,6 @@ public class JoinBatchMemoryManager extends 
RecordBatchMemoryManager {
     // set the new row width
     setOutgoingRowWidth(newOutgoingRowWidth);
 
-    logger.debug("output batch size : {}, avg outgoing rowWidth : {}, output 
rowCount : {}",
-      getOutputBatchSize(), getOutgoingRowWidth(), getOutputRowCount());
-
     return adjustOutputRowCount(outputPosition + numOutputRowsRemaining);
   }
 
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java
index d4da171..d30f565 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java
@@ -452,7 +452,7 @@ public class RecordBatchSizer {
            .append(", per-array: ")
            .append(cardinality);
       }
-      buf.append("Per entry: std data size: ")
+      buf.append(", Per entry: std data size: ")
          .append(getStdDataSizePerEntry())
          .append(", std net size: ")
          .append(getStdNetSizePerEntry())
@@ -758,12 +758,8 @@ public class RecordBatchSizer {
   @Override
   public String toString() {
     StringBuilder buf = new StringBuilder();
-    buf.append("Actual batch schema & sizes {\n");
-    for (ColumnSize colSize : columnSizes.values()) {
-      buf.append("  ");
-      buf.append(colSize.toString());
-      buf.append("\n");
-    }
+
+    buf.append("Batch size: {");
     buf.append( "  Records: " );
     buf.append(rowCount);
     buf.append(", Total size: ");
@@ -776,7 +772,15 @@ public class RecordBatchSizer {
     buf.append(netRowWidth);
     buf.append(", Density: ");
     buf.append(avgDensity);
-    buf.append("%}");
+    buf.append("% }\n");
+
+    buf.append("Batch schema & sizes: {\n");
+    for (ColumnSize colSize : columnSizes.values()) {
+      buf.append("  ");
+      buf.append(colSize.toString());
+      buf.append(" }\n");
+    }
+    buf.append(" }\n");
     return buf.toString();
   }
 

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to