This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 765467f94ee [hotfix][runtime] Avoid duplicating broadcast records 
redundantly for hybrid shuffle
765467f94ee is described below

commit 765467f94eef2c9487ab590efe1717879fb6733d
Author: Yuxin Tan <[email protected]>
AuthorDate: Fri Jul 5 16:32:21 2024 +0800

    [hotfix][runtime] Avoid duplicating broadcast records redundantly for 
hybrid shuffle
---
 .../hybrid/tiered/storage/TieredStorageProducerClient.java         | 7 +++----
 1 file changed, 3 insertions(+), 4 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java
index 80a712d9146..9a03aef0005 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java
@@ -105,16 +105,15 @@ public class TieredStorageProducerClient {
             throws IOException {
 
         if (isBroadcast && !isBroadcastOnly) {
+            int currentPosition = record.position();
             for (int i = 0; i < numSubpartitions; ++i) {
                 // As the tiered storage subpartition ID is created only for 
broadcast records,
                 // which are fewer than normal records, the performance impact 
of generating new
                 // TieredStorageSubpartitionId objects is expected to be 
manageable. If the
                 // performance is significantly affected, this logic will be 
optimized accordingly.
                 bufferAccumulator.receive(
-                        record.duplicate(),
-                        new TieredStorageSubpartitionId(i),
-                        dataType,
-                        isBroadcast);
+                        record, new TieredStorageSubpartitionId(i), dataType, 
isBroadcast);
+                record.position(currentPosition);
             }
         } else {
             bufferAccumulator.receive(record, subpartitionId, dataType, 
isBroadcast);

Reply via email to