TanYuxin-tyx commented on code in PR #22652:
URL: https://github.com/apache/flink/pull/22652#discussion_r1214057512


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java:
##########
@@ -80,13 +107,23 @@ public void write(
 
         if (isBroadcast && !isBroadcastOnly) {
             for (int i = 0; i < numSubpartitions; ++i) {
-                bufferAccumulator.receive(record.duplicate(), subpartitionId, 
dataType);
+                // As the tiered storage subpartition ID is created only for 
broadcast records,
+                // which are fewer than normal records, the performance impact 
of generating new
+                // TieredStorageSubpartitionId objects is expected to be 
manageable. If the
+                // performance is significantly affected, this logic will be 
optimized accordingly.
+                bufferAccumulator.receive(
+                        record.duplicate(), new 
TieredStorageSubpartitionId(i), dataType);
             }
         } else {
             bufferAccumulator.receive(record, subpartitionId, dataType);
         }
     }
 
+    public void setMetricStatisticsUpdater(
+            Consumer<TieredStorageProducerMetricUpdate> 
metricStatisticsUpdater) {
+        this.metricStatisticsUpdater = metricStatisticsUpdater;

Review Comment:
   Ah. I missed this. Fixed this now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to