This is an automated email from the ASF dual-hosted git repository.

abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git


The following commit(s) were added to refs/heads/master by this push:
     new 43562adc3 TEZ-4521: Partition stats should be always uncompressed size 
(#317) (okumin reviewed by Laszlo Bodor)
43562adc3 is described below

commit 43562adc38117ae191650fe54f8344e635dcc890
Author: okumin <g...@okumin.com>
AuthorDate: Wed Nov 29 00:33:09 2023 +0900

    TEZ-4521: Partition stats should be always uncompressed size (#317) (okumin 
reviewed by Laszlo Bodor)
---
 .../library/vertexmanager/ShuffleVertexManagerBase.java   |  2 ++
 .../runtime/library/common/sort/impl/ExternalSorter.java  |  1 +
 .../runtime/library/common/sort/impl/PipelinedSorter.java |  6 +++---
 .../library/common/sort/impl/dflt/DefaultSorter.java      |  6 +++---
 .../vertexmanager/TestShuffleVertexManagerUtils.java      | 15 +++------------
 5 files changed, 12 insertions(+), 18 deletions(-)

diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManagerBase.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManagerBase.java
index c6264fb2f..1d55c7194 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManagerBase.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManagerBase.java
@@ -130,7 +130,9 @@ abstract class ShuffleVertexManagerBase extends 
VertexManagerPlugin {
     final BitSet finishedTaskSet;
     int numTasks;
     int numVMEventsReceived;
+    // The total uncompressed size
     long outputSize;
+    // The uncompressed size of each partition. The size might not be precise
     int[] statsInMB;
     EdgeManagerPluginDescriptor newDescriptor;
 
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
index 758c06979..232d96430 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
@@ -131,6 +131,7 @@ public abstract class ExternalSorter {
   protected final boolean cleanup;
 
   protected OutputStatisticsReporter statsReporter;
+  // uncompressed size for each partition
   protected final long[] partitionStats;
   protected final boolean finalMergeEnabled;
   protected final boolean sendEmptyPartitionDetails;
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index 08786c9b2..067dcca0c 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -622,7 +622,7 @@ public class PipelinedSorter extends ExternalSorter {
             new TezIndexRecord(segmentStart, rawLength, partLength);
         spillRec.putIndex(rec, i);
         if (!isFinalMergeEnabled() && reportPartitionStats()) {
-          partitionStats[i] += partLength;
+          partitionStats[i] += rawLength;
         }
       }
 
@@ -747,7 +747,7 @@ public class PipelinedSorter extends ExternalSorter {
         TezSpillRecord spillRecord = new TezSpillRecord(finalIndexFile, 
localFs);
         if (reportPartitionStats()) {
           for (int i = 0; i < spillRecord.size(); i++) {
-            partitionStats[i] += spillRecord.getIndex(i).getPartLength();
+            partitionStats[i] += spillRecord.getIndex(i).getRawLength();
           }
         }
         numShuffleChunks.setValue(numSpills);
@@ -832,7 +832,7 @@ public class PipelinedSorter extends ExternalSorter {
             new TezIndexRecord(segmentStart, rawLength, partLength);
         spillRec.putIndex(rec, parts);
         if (reportPartitionStats()) {
-          partitionStats[parts] += partLength;
+          partitionStats[parts] += rawLength;
         }
       }
 
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
index 7c678749b..6354c7cc4 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
@@ -956,7 +956,7 @@ public final class DefaultSorter extends ExternalSorter 
implements IndexedSortab
               new TezIndexRecord(segmentStart, rawLength, partLength);
           spillRec.putIndex(rec, i);
           if (!isFinalMergeEnabled() && reportPartitionStats() && writer != 
null) {
-            partitionStats[i] += partLength;
+            partitionStats[i] += rawLength;
           }
           writer = null;
         } finally {
@@ -1244,7 +1244,7 @@ public final class DefaultSorter extends ExternalSorter 
implements IndexedSortab
       }
       if (spillRecord != null && reportPartitionStats()) {
         for(int i=0; i < spillRecord.size(); i++) {
-          partitionStats[i] += spillRecord.getIndex(i).getPartLength();
+          partitionStats[i] += spillRecord.getIndex(i).getRawLength();
         }
       }
       numShuffleChunks.setValue(numSpills);
@@ -1388,7 +1388,7 @@ public final class DefaultSorter extends ExternalSorter 
implements IndexedSortab
           new TezIndexRecord(segmentStart, rawLength, partLength);
         spillRec.putIndex(rec, parts);
         if (reportPartitionStats()) {
-          partitionStats[parts] += partLength;
+          partitionStats[parts] += rawLength;
         }
       }
       numShuffleChunks.setValue(1); //final merge has happened
diff --git 
a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManagerUtils.java
 
b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManagerUtils.java
index 44adc462b..5d1509754 100644
--- 
a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManagerUtils.java
+++ 
b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManagerUtils.java
@@ -20,6 +20,7 @@ package org.apache.tez.dag.library.vertexmanager;
 
 
 import com.google.protobuf.ByteString;
+import java.util.Arrays;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.tez.common.ReflectionUtils;
@@ -125,10 +126,10 @@ public class TestShuffleVertexManagerUtils {
       long uncompressedTotalSize, String vertexName, boolean 
reportDetailedStats)
       throws IOException {
     ByteBuffer payload;
-    long totalSize = 0;
+    final long totalSize;
     // Use partition sizes to compute the total size.
     if (partitionSizes != null) {
-      totalSize = estimatedUncompressedSum(partitionSizes);
+      totalSize = Arrays.stream(partitionSizes).sum();
     } else {
       totalSize = uncompressedTotalSize;
     }
@@ -169,16 +170,6 @@ public class TestShuffleVertexManagerUtils {
     return vmEvent;
   }
 
-  // Assume 3 : 1 compression ratio to estimate the total size
-  // of all partitions.
-  long estimatedUncompressedSum(long[] partitionStats) {
-    long sum = 0;
-    for (long partition : partitionStats) {
-      sum += partition;
-    }
-    return sum * 3;
-  }
-
   public static TaskAttemptIdentifier createTaskAttemptIdentifier(String 
vName, int tId) {
     VertexIdentifier mockVertex = mock(VertexIdentifier.class);
     when(mockVertex.getName()).thenReturn(vName);

Reply via email to