This is an automated email from the ASF dual-hosted git repository.
abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git
The following commit(s) were added to refs/heads/master by this push:
new 43562adc3 TEZ-4521: Partition stats should be always uncompressed size
(#317) (okumin reviewed by Laszlo Bodor)
43562adc3 is described below
commit 43562adc38117ae191650fe54f8344e635dcc890
Author: okumin <[email protected]>
AuthorDate: Wed Nov 29 00:33:09 2023 +0900
TEZ-4521: Partition stats should be always uncompressed size (#317) (okumin
reviewed by Laszlo Bodor)
---
.../library/vertexmanager/ShuffleVertexManagerBase.java | 2 ++
.../runtime/library/common/sort/impl/ExternalSorter.java | 1 +
.../runtime/library/common/sort/impl/PipelinedSorter.java | 6 +++---
.../library/common/sort/impl/dflt/DefaultSorter.java | 6 +++---
.../vertexmanager/TestShuffleVertexManagerUtils.java | 15 +++------------
5 files changed, 12 insertions(+), 18 deletions(-)
diff --git
a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManagerBase.java
b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManagerBase.java
index c6264fb2f..1d55c7194 100644
---
a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManagerBase.java
+++
b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManagerBase.java
@@ -130,7 +130,9 @@ abstract class ShuffleVertexManagerBase extends
VertexManagerPlugin {
final BitSet finishedTaskSet;
int numTasks;
int numVMEventsReceived;
+ // The total uncompressed size
long outputSize;
+ // The uncompressed size of each partition. The size might not be precise
int[] statsInMB;
EdgeManagerPluginDescriptor newDescriptor;
diff --git
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
index 758c06979..232d96430 100644
---
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
+++
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
@@ -131,6 +131,7 @@ public abstract class ExternalSorter {
protected final boolean cleanup;
protected OutputStatisticsReporter statsReporter;
+ // uncompressed size for each partition
protected final long[] partitionStats;
protected final boolean finalMergeEnabled;
protected final boolean sendEmptyPartitionDetails;
diff --git
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index 08786c9b2..067dcca0c 100644
---
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -622,7 +622,7 @@ public class PipelinedSorter extends ExternalSorter {
new TezIndexRecord(segmentStart, rawLength, partLength);
spillRec.putIndex(rec, i);
if (!isFinalMergeEnabled() && reportPartitionStats()) {
- partitionStats[i] += partLength;
+ partitionStats[i] += rawLength;
}
}
@@ -747,7 +747,7 @@ public class PipelinedSorter extends ExternalSorter {
TezSpillRecord spillRecord = new TezSpillRecord(finalIndexFile,
localFs);
if (reportPartitionStats()) {
for (int i = 0; i < spillRecord.size(); i++) {
- partitionStats[i] += spillRecord.getIndex(i).getPartLength();
+ partitionStats[i] += spillRecord.getIndex(i).getRawLength();
}
}
numShuffleChunks.setValue(numSpills);
@@ -832,7 +832,7 @@ public class PipelinedSorter extends ExternalSorter {
new TezIndexRecord(segmentStart, rawLength, partLength);
spillRec.putIndex(rec, parts);
if (reportPartitionStats()) {
- partitionStats[parts] += partLength;
+ partitionStats[parts] += rawLength;
}
}
diff --git
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
index 7c678749b..6354c7cc4 100644
---
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
+++
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
@@ -956,7 +956,7 @@ public final class DefaultSorter extends ExternalSorter
implements IndexedSortab
new TezIndexRecord(segmentStart, rawLength, partLength);
spillRec.putIndex(rec, i);
if (!isFinalMergeEnabled() && reportPartitionStats() && writer !=
null) {
- partitionStats[i] += partLength;
+ partitionStats[i] += rawLength;
}
writer = null;
} finally {
@@ -1244,7 +1244,7 @@ public final class DefaultSorter extends ExternalSorter
implements IndexedSortab
}
if (spillRecord != null && reportPartitionStats()) {
for(int i=0; i < spillRecord.size(); i++) {
- partitionStats[i] += spillRecord.getIndex(i).getPartLength();
+ partitionStats[i] += spillRecord.getIndex(i).getRawLength();
}
}
numShuffleChunks.setValue(numSpills);
@@ -1388,7 +1388,7 @@ public final class DefaultSorter extends ExternalSorter
implements IndexedSortab
new TezIndexRecord(segmentStart, rawLength, partLength);
spillRec.putIndex(rec, parts);
if (reportPartitionStats()) {
- partitionStats[parts] += partLength;
+ partitionStats[parts] += rawLength;
}
}
numShuffleChunks.setValue(1); //final merge has happened
diff --git
a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManagerUtils.java
b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManagerUtils.java
index 44adc462b..5d1509754 100644
---
a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManagerUtils.java
+++
b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManagerUtils.java
@@ -20,6 +20,7 @@ package org.apache.tez.dag.library.vertexmanager;
import com.google.protobuf.ByteString;
+import java.util.Arrays;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.tez.common.ReflectionUtils;
@@ -125,10 +126,10 @@ public class TestShuffleVertexManagerUtils {
long uncompressedTotalSize, String vertexName, boolean
reportDetailedStats)
throws IOException {
ByteBuffer payload;
- long totalSize = 0;
+ final long totalSize;
// Use partition sizes to compute the total size.
if (partitionSizes != null) {
- totalSize = estimatedUncompressedSum(partitionSizes);
+ totalSize = Arrays.stream(partitionSizes).sum();
} else {
totalSize = uncompressedTotalSize;
}
@@ -169,16 +170,6 @@ public class TestShuffleVertexManagerUtils {
return vmEvent;
}
- // Assume 3 : 1 compression ratio to estimate the total size
- // of all partitions.
- long estimatedUncompressedSum(long[] partitionStats) {
- long sum = 0;
- for (long partition : partitionStats) {
- sum += partition;
- }
- return sum * 3;
- }
-
public static TaskAttemptIdentifier createTaskAttemptIdentifier(String
vName, int tId) {
VertexIdentifier mockVertex = mock(VertexIdentifier.class);
when(mockVertex.getName()).thenReturn(vName);