xintongsong commented on code in PR #22652:
URL: https://github.com/apache/flink/pull/22652#discussion_r1212751146


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java:
##########
@@ -80,13 +107,23 @@ public void write(
 
         if (isBroadcast && !isBroadcastOnly) {
             for (int i = 0; i < numSubpartitions; ++i) {
-                bufferAccumulator.receive(record.duplicate(), subpartitionId, 
dataType);
+                // As the tiered storage subpartition ID is created only for 
broadcast records,
+                // which are fewer than normal records, the performance impact 
is expected to be

Review Comment:
   "the performance impact of generating new `TieredStorageSubpartitionId` 
objects"



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java:
##########
@@ -100,26 +137,95 @@ public void close() {
      */
     private void writeAccumulatedBuffers(
             TieredStorageSubpartitionId subpartitionId, List<Buffer> 
accumulatedBuffers) {
-        try {
-            for (Buffer finishedBuffer : accumulatedBuffers) {
-                writeAccumulatedBuffer(subpartitionId, finishedBuffer);
+        Iterator<Buffer> bufferIterator = accumulatedBuffers.iterator();
+
+        int numWriteBytes = 0;
+        int numWriteBuffers = 0;
+        while (bufferIterator.hasNext()) {
+            Buffer buffer = bufferIterator.next();
+            try {
+                writeAccumulatedBuffer(subpartitionId, buffer);
+            } catch (IOException ioe) {
+                buffer.recycleBuffer();
+                while (bufferIterator.hasNext()) {
+                    bufferIterator.next().recycleBuffer();
+                }
+                ExceptionUtils.rethrow(ioe);
             }
-        } catch (IOException e) {
-            ExceptionUtils.rethrow(e);
+            numWriteBuffers++;
+            numWriteBytes += buffer.readableBytes();
         }
+        updateMetricStatistics(numWriteBuffers, numWriteBytes);
     }
 
     /**
      * Write the accumulated buffer of this subpartitionId to an appropriate 
tier. After the tier is
      * decided, the buffer will be written to the selected tier.
      *
+     * <p>Note that the method only throws an exception when choosing a 
storage tier, so the caller
+     * should ensure that the buffer is recycled when throwing an exception.
+     *
      * @param subpartitionId the subpartition identifier
      * @param accumulatedBuffer one accumulated buffer of this subpartition
      */
     private void writeAccumulatedBuffer(
             TieredStorageSubpartitionId subpartitionId, Buffer 
accumulatedBuffer)
             throws IOException {
-        // TODO, Try to write the accumulated buffer to the appropriate tier. 
After the tier is
-        // decided, then write the accumulated buffer to the tier.
+        Buffer compressedBuffer = compressBufferIfPossible(accumulatedBuffer);
+
+        if (currentSubpartitionTierAgent[subpartitionId.getSubpartitionId()] 
== null) {
+            chooseStorageTierToStartSegment(subpartitionId);
+        }
+
+        boolean isSuccess =
+                
currentSubpartitionTierAgent[subpartitionId.getSubpartitionId()].write(
+                        subpartitionId, compressedBuffer);
+        if (!isSuccess) {
+            chooseStorageTierToStartSegment(subpartitionId);
+            isSuccess =
+                    
currentSubpartitionTierAgent[subpartitionId.getSubpartitionId()].write(
+                            subpartitionId, compressedBuffer);
+            checkState(isSuccess, "Failed to write the first buffer to the new 
segment");
+        }
+    }
+
+    private void chooseStorageTierToStartSegment(TieredStorageSubpartitionId 
subpartitionId)
+            throws IOException {
+        int subpartitionIndex = subpartitionId.getSubpartitionId();
+        int segmentIndex = currentSubpartitionSegmentId[subpartitionIndex];
+        int nextSegmentIndex = segmentIndex + 1;
+
+        for (TierProducerAgent tierProducerAgent : tierProducerAgents) {
+            if (tierProducerAgent.tryStartNewSegment(subpartitionId, 
nextSegmentIndex)) {
+                // Update the segment index and the chosen storage tier for 
the subpartition.
+                currentSubpartitionSegmentId[subpartitionIndex] = 
nextSegmentIndex;
+                currentSubpartitionTierAgent[subpartitionIndex] = 
tierProducerAgent;
+                return;
+            }
+        }
+        throw new IOException("Failed to choose a storage tier to start a new 
segment.");
+    }
+
+    private Buffer compressBufferIfPossible(Buffer buffer) {
+        if (!canBeCompressed(buffer)) {
+            return buffer;
+        }
+
+        return checkNotNull(bufferCompressor).compressToOriginalBuffer(buffer);
+    }
+
+    /**
+     * Whether the buffer can be compressed or not. Note that event is not 
compressed because it is
+     * usually small and the size can become even larger after compression.
+     */
+    private boolean canBeCompressed(Buffer buffer) {
+        return bufferCompressor != null && buffer.isBuffer() && 
buffer.readableBytes() > 0;
+    }
+
+    private void updateMetricStatistics(int numWriteBuffers, int 
numWriteBytes) {

Review Comment:
   ```suggestion
       private void updateMetricStatistics(int numWriteBuffersDelta, int 
numWriteBytesDelta) {
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java:
##########
@@ -100,26 +137,95 @@ public void close() {
      */
     private void writeAccumulatedBuffers(
             TieredStorageSubpartitionId subpartitionId, List<Buffer> 
accumulatedBuffers) {
-        try {
-            for (Buffer finishedBuffer : accumulatedBuffers) {
-                writeAccumulatedBuffer(subpartitionId, finishedBuffer);
+        Iterator<Buffer> bufferIterator = accumulatedBuffers.iterator();
+
+        int numWriteBytes = 0;
+        int numWriteBuffers = 0;
+        while (bufferIterator.hasNext()) {
+            Buffer buffer = bufferIterator.next();
+            try {
+                writeAccumulatedBuffer(subpartitionId, buffer);
+            } catch (IOException ioe) {
+                buffer.recycleBuffer();
+                while (bufferIterator.hasNext()) {
+                    bufferIterator.next().recycleBuffer();
+                }
+                ExceptionUtils.rethrow(ioe);
             }
-        } catch (IOException e) {
-            ExceptionUtils.rethrow(e);
+            numWriteBuffers++;
+            numWriteBytes += buffer.readableBytes();
         }
+        updateMetricStatistics(numWriteBuffers, numWriteBytes);
     }
 
     /**
      * Write the accumulated buffer of this subpartitionId to an appropriate 
tier. After the tier is
      * decided, the buffer will be written to the selected tier.
      *
+     * <p>Note that the method only throws an exception when choosing a 
storage tier, so the caller
+     * should ensure that the buffer is recycled when throwing an exception.
+     *
      * @param subpartitionId the subpartition identifier
      * @param accumulatedBuffer one accumulated buffer of this subpartition
      */
     private void writeAccumulatedBuffer(
             TieredStorageSubpartitionId subpartitionId, Buffer 
accumulatedBuffer)
             throws IOException {
-        // TODO, Try to write the accumulated buffer to the appropriate tier. 
After the tier is
-        // decided, then write the accumulated buffer to the tier.
+        Buffer compressedBuffer = compressBufferIfPossible(accumulatedBuffer);
+
+        if (currentSubpartitionTierAgent[subpartitionId.getSubpartitionId()] 
== null) {
+            chooseStorageTierToStartSegment(subpartitionId);
+        }
+
+        boolean isSuccess =
+                
currentSubpartitionTierAgent[subpartitionId.getSubpartitionId()].write(
+                        subpartitionId, compressedBuffer);
+        if (!isSuccess) {
+            chooseStorageTierToStartSegment(subpartitionId);
+            isSuccess =
+                    
currentSubpartitionTierAgent[subpartitionId.getSubpartitionId()].write(
+                            subpartitionId, compressedBuffer);
+            checkState(isSuccess, "Failed to write the first buffer to the new 
segment");
+        }
+    }
+
+    private void chooseStorageTierToStartSegment(TieredStorageSubpartitionId 
subpartitionId)
+            throws IOException {
+        int subpartitionIndex = subpartitionId.getSubpartitionId();
+        int segmentIndex = currentSubpartitionSegmentId[subpartitionIndex];
+        int nextSegmentIndex = segmentIndex + 1;
+
+        for (TierProducerAgent tierProducerAgent : tierProducerAgents) {
+            if (tierProducerAgent.tryStartNewSegment(subpartitionId, 
nextSegmentIndex)) {
+                // Update the segment index and the chosen storage tier for 
the subpartition.
+                currentSubpartitionSegmentId[subpartitionIndex] = 
nextSegmentIndex;
+                currentSubpartitionTierAgent[subpartitionIndex] = 
tierProducerAgent;
+                return;
+            }
+        }
+        throw new IOException("Failed to choose a storage tier to start a new 
segment.");
+    }
+
+    private Buffer compressBufferIfPossible(Buffer buffer) {
+        if (!canBeCompressed(buffer)) {
+            return buffer;
+        }
+
+        return checkNotNull(bufferCompressor).compressToOriginalBuffer(buffer);
+    }
+
+    /**
+     * Whether the buffer can be compressed or not. Note that event is not 
compressed because it is
+     * usually small and the size can become even larger after compression.
+     */
+    private boolean canBeCompressed(Buffer buffer) {
+        return bufferCompressor != null && buffer.isBuffer() && 
buffer.readableBytes() > 0;
+    }
+
+    private void updateMetricStatistics(int numWriteBuffers, int 
numWriteBytes) {
+        if (metricStatisticsUpdater != null) {

Review Comment:
   I think we should require the updater to be set before writing records. 
Otherwise, the statistics would be incorrect, which is also hard to discover.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerMetricStatistics.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+/** The metric statistics for the tiered storage producer. */
+public class TieredStorageProducerMetricStatistics {

Review Comment:
   ```suggestion
   public class TieredStorageProducerStatisticUpdate {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to