TanYuxin-tyx commented on code in PR #22652:
URL: https://github.com/apache/flink/pull/22652#discussion_r1214031770
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierProducerAgent.java:
##########
@@ -35,9 +41,21 @@ public interface TierProducerAgent {
*/
boolean tryStartNewSegment(TieredStorageSubpartitionId subpartitionId, int
segmentId);
- /** Writes the finished {@link Buffer} to the consumer. */
- boolean write(TieredStorageSubpartitionId subpartitionId, Buffer
finishedBuffer)
- throws IOException;
+ /**
+ * Writes the finished {@link Buffer} to the consumer.
+ *
+ * <p>Note that the tier must ensure that the buffer is written
successfully without any
+ * exceptions, in order to guarantee that the buffer will be recycled. If
this method throws an
+ * exception in the subsequent modifications, the caller should make sure
that the buffer is
+ * recycled finally.
+ *
+ * @param subpartitionId the subpartition id that the buffer is writing to
+ * @param finishedBuffer the writing buffer
+ * @return return true if the buffer is written successfully, return false
if the current
+ * segment can not store this buffer and the current segment is
finished. When returning
+ * false, the agent should try start a new segment before writing the
buffer.
+ */
+ boolean write(TieredStorageSubpartitionId subpartitionId, Buffer
finishedBuffer);
Review Comment:
Renamed it.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java:
##########
@@ -100,26 +137,95 @@ public void close() {
*/
private void writeAccumulatedBuffers(
TieredStorageSubpartitionId subpartitionId, List<Buffer>
accumulatedBuffers) {
- try {
- for (Buffer finishedBuffer : accumulatedBuffers) {
- writeAccumulatedBuffer(subpartitionId, finishedBuffer);
+ Iterator<Buffer> bufferIterator = accumulatedBuffers.iterator();
+
+ int numWriteBytes = 0;
+ int numWriteBuffers = 0;
+ while (bufferIterator.hasNext()) {
+ Buffer buffer = bufferIterator.next();
+ try {
+ writeAccumulatedBuffer(subpartitionId, buffer);
+ } catch (IOException ioe) {
+ buffer.recycleBuffer();
+ while (bufferIterator.hasNext()) {
+ bufferIterator.next().recycleBuffer();
+ }
+ ExceptionUtils.rethrow(ioe);
}
- } catch (IOException e) {
- ExceptionUtils.rethrow(e);
+ numWriteBuffers++;
+ numWriteBytes += buffer.readableBytes();
}
+ updateMetricStatistics(numWriteBuffers, numWriteBytes);
}
/**
* Write the accumulated buffer of this subpartitionId to an appropriate
tier. After the tier is
* decided, the buffer will be written to the selected tier.
*
+ * <p>Note that the method only throws an exception when choosing a
storage tier, so the caller
+ * should ensure that the buffer is recycled when throwing an exception.
+ *
* @param subpartitionId the subpartition identifier
* @param accumulatedBuffer one accumulated buffer of this subpartition
*/
private void writeAccumulatedBuffer(
TieredStorageSubpartitionId subpartitionId, Buffer
accumulatedBuffer)
throws IOException {
- // TODO, Try to write the accumulated buffer to the appropriate tier.
After the tier is
- // decided, then write the accumulated buffer to the tier.
+ Buffer compressedBuffer = compressBufferIfPossible(accumulatedBuffer);
+
+ if (currentSubpartitionTierAgent[subpartitionId.getSubpartitionId()]
== null) {
+ chooseStorageTierToStartSegment(subpartitionId);
+ }
+
+ boolean isSuccess =
+
currentSubpartitionTierAgent[subpartitionId.getSubpartitionId()].write(
+ subpartitionId, compressedBuffer);
+ if (!isSuccess) {
+ chooseStorageTierToStartSegment(subpartitionId);
+ isSuccess =
+
currentSubpartitionTierAgent[subpartitionId.getSubpartitionId()].write(
+ subpartitionId, compressedBuffer);
+ checkState(isSuccess, "Failed to write the first buffer to the new
segment");
+ }
Review Comment:
Fixed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]