This is an automated email from the ASF dual-hosted git repository.
guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 765467f94ee [hotfix][runtime] Avoid duplicating broadcast records
redundantly for hybrid shuffle
765467f94ee is described below
commit 765467f94eef2c9487ab590efe1717879fb6733d
Author: Yuxin Tan <[email protected]>
AuthorDate: Fri Jul 5 16:32:21 2024 +0800
[hotfix][runtime] Avoid duplicating broadcast records redundantly for
hybrid shuffle
---
.../hybrid/tiered/storage/TieredStorageProducerClient.java | 7 +++----
1 file changed, 3 insertions(+), 4 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java
index 80a712d9146..9a03aef0005 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java
@@ -105,16 +105,15 @@ public class TieredStorageProducerClient {
throws IOException {
if (isBroadcast && !isBroadcastOnly) {
+ int currentPosition = record.position();
for (int i = 0; i < numSubpartitions; ++i) {
// As the tiered storage subpartition ID is created only for
broadcast records,
// which are fewer than normal records, the performance impact
of generating new
// TieredStorageSubpartitionId objects is expected to be
manageable. If the
// performance is significantly affected, this logic will be
optimized accordingly.
bufferAccumulator.receive(
- record.duplicate(),
- new TieredStorageSubpartitionId(i),
- dataType,
- isBroadcast);
+ record, new TieredStorageSubpartitionId(i), dataType,
isBroadcast);
+ record.position(currentPosition);
}
} else {
bufferAccumulator.receive(record, subpartitionId, dataType,
isBroadcast);