reswqa commented on code in PR #22958:
URL: https://github.com/apache/flink/pull/22958#discussion_r1253950867
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/HashSubpartitionBufferAccumulator.java:
##########
@@ -27,6 +27,8 @@
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+import net.jcip.annotations.GuardedBy;
Review Comment:
This is incorrect import, should be `javax.annotation.concurrent.GuardedBy`
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/HashSubpartitionBufferAccumulator.java:
##########
@@ -152,8 +185,26 @@ private void finishCurrentWritingBuffer() {
flushFinishedBuffer(buffer);
}
+ private void recycleBuffers() {
+ synchronized (lock) {
+ while (!unfinishedBuffers.isEmpty()) {
+ unfinishedBuffers
+ .poll()
+ .createBufferConsumerFromBeginning()
+ .build()
+ .recycleBuffer();
+ }
+ }
+ }
+
private void flushFinishedBuffer(Buffer finishedBuffer) {
- bufferAccumulatorContext.flushAccumulatedBuffers(
- subpartitionId, Collections.singletonList(finishedBuffer));
+ synchronized (lock) {
+ if (isReleased) {
Review Comment:
Does this check and release is necessary?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/HashSubpartitionBufferAccumulator.java:
##########
@@ -122,6 +152,9 @@ private void ensureCapacityForRecord(ByteBuffer record) {
private void writeRecord(ByteBuffer record) {
while (record.hasRemaining()) {
+ if (isReleased) {
Review Comment:
Does this check is necessary?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/HashSubpartitionBufferAccumulator.java:
##########
@@ -68,17 +83,32 @@ public HashSubpartitionBufferAccumulator(
// ------------------------------------------------------------------------
public void append(ByteBuffer record, Buffer.DataType dataType) throws
IOException {
- if (dataType.isEvent()) {
- writeEvent(record, dataType);
- } else {
- writeRecord(record, dataType);
+ synchronized (lock) {
+ if (isReleased) {
+ return;
+ }
+ if (dataType.isEvent()) {
+ writeEvent(record, dataType);
+ } else {
+ writeRecord(record, dataType);
+ }
}
}
+ /** This method can only be called by the task thread. */
public void close() {
+ recycleBuffers();
checkState(unfinishedBuffers.isEmpty(), "There are unfinished
buffers.");
}
+ public void release() {
Review Comment:
Maybe we only need release the buffers in close method and get rid of this
lock in the same time.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/HashSubpartitionBufferAccumulator.java:
##########
@@ -152,8 +185,26 @@ private void finishCurrentWritingBuffer() {
flushFinishedBuffer(buffer);
}
+ private void recycleBuffers() {
+ synchronized (lock) {
+ while (!unfinishedBuffers.isEmpty()) {
+ unfinishedBuffers
+ .poll()
+ .createBufferConsumerFromBeginning()
Review Comment:
This will not recycle this buffer in fact.
Won't `BufferBuilder`'s `close` recycle this buffer? Why do we need to
convert it into a `Buffer` then recycle it.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/SortBufferAccumulator.java:
##########
@@ -25,8 +25,11 @@
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.partition.BufferWithChannel;
import org.apache.flink.runtime.io.network.partition.DataBuffer;
+import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+import net.jcip.annotations.GuardedBy;
Review Comment:
This is incorrect import, should be `javax.annotation.concurrent.GuardedBy`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]