This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 9880ba5324d [FLINK-31637][network] Implement the HashBufferAccumulator 
for the tiered storage
9880ba5324d is described below

commit 9880ba5324d4a1252d6ae1a3f0f061e4469a05ac
Author: Yuxin Tan <[email protected]>
AuthorDate: Wed May 10 13:58:00 2023 +0800

    [FLINK-31637][network] Implement the HashBufferAccumulator for the tiered 
storage
    
    This closes #22501
---
 .../hybrid/tiered/storage/BufferAccumulator.java   |   7 +-
 .../tiered/storage/HashBufferAccumulator.java      | 110 +++++++++++++
 .../storage/HashSubpartitionBufferAccumulator.java | 159 +++++++++++++++++++
 .../HashSubpartitionBufferAccumulatorContext.java} |  40 ++---
 .../storage/TieredStorageProducerClient.java       |   2 +-
 .../hybrid/tiered/TestingBufferAccumulator.java    |   4 +-
 .../tiered/storage/HashBufferAccumulatorTest.java  | 170 +++++++++++++++++++++
 7 files changed, 465 insertions(+), 27 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/BufferAccumulator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/BufferAccumulator.java
index e57f8383051..762aae35bdc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/BufferAccumulator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/BufferAccumulator.java
@@ -30,19 +30,16 @@ import java.util.function.BiConsumer;
  * Accumulates received records into buffers. The {@link BufferAccumulator} 
receives the records
  * from tiered store producer and the records will accumulate and transform 
into buffers.
  */
-public interface BufferAccumulator {
+public interface BufferAccumulator extends AutoCloseable {
 
     /**
      * Setup the accumulator.
      *
-     * @param numSubpartitions number of subpartitions
      * @param bufferFlusher accepts the accumulated buffers. The first field 
is the subpartition id,
      *     while the list in the second field contains accumulated buffers in 
order for that
      *     subpartition.
      */
-    void setup(
-            int numSubpartitions,
-            BiConsumer<TieredStorageSubpartitionId, List<Buffer>> 
bufferFlusher);
+    void setup(BiConsumer<TieredStorageSubpartitionId, List<Buffer>> 
bufferFlusher);
 
     /**
      * Receives the records from tiered store producer, these records will be 
accumulated and
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/HashBufferAccumulator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/HashBufferAccumulator.java
new file mode 100644
index 00000000000..6f87c423a5c
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/HashBufferAccumulator.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.function.BiConsumer;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The hash implementation of the {@link BufferAccumulator}. The {@link 
BufferAccumulator} receives
+ * the records from {@link TieredStorageProducerClient} and the records will 
accumulate and
+ * transform to finished buffers. The accumulated buffers will be transferred 
to the corresponding
+ * tier dynamically.
+ *
+ * <p>To avoid the buffer waiting deadlock between the subpartitions, the 
{@link
+ * HashBufferAccumulator} requires at least n buffers (n is the number of 
subpartitions) to make
+ * sure that each subpartition has at least one buffer to accumulate the 
receiving data. Once an
+ * accumulated buffer is finished, the buffer will be flushed immediately.
+ *
+ * <p>Note that this class need not be thread-safe, because it should only be 
accessed from the main
+ * thread.
+ */
+public class HashBufferAccumulator
+        implements BufferAccumulator, HashSubpartitionBufferAccumulatorContext 
{
+
+    private final TieredStorageMemoryManager memoryManager;
+
+    private final HashSubpartitionBufferAccumulator[] 
hashSubpartitionBufferAccumulators;
+
+    /**
+     * The {@link HashBufferAccumulator}'s accumulated buffer flusher is not 
prepared during
+     * construction, requiring the field to be initialized during setup. 
Therefore, it is necessary
+     * to verify whether this field is null before using it.
+     */
+    @Nullable
+    private BiConsumer<TieredStorageSubpartitionId, List<Buffer>> 
accumulatedBufferFlusher;
+
+    public HashBufferAccumulator(
+            int numSubpartitions, int bufferSize, TieredStorageMemoryManager 
memoryManager) {
+        this.memoryManager = memoryManager;
+        this.hashSubpartitionBufferAccumulators =
+                new HashSubpartitionBufferAccumulator[numSubpartitions];
+        for (int i = 0; i < numSubpartitions; i++) {
+            hashSubpartitionBufferAccumulators[i] =
+                    new HashSubpartitionBufferAccumulator(
+                            new TieredStorageSubpartitionId(i), bufferSize, 
this);
+        }
+    }
+
+    @Override
+    public void setup(
+            BiConsumer<TieredStorageSubpartitionId, List<Buffer>> 
accumulatedBufferFlusher) {
+        this.accumulatedBufferFlusher = accumulatedBufferFlusher;
+    }
+
+    @Override
+    public void receive(
+            ByteBuffer record, TieredStorageSubpartitionId subpartitionId, 
Buffer.DataType dataType)
+            throws IOException {
+        getSubpartitionAccumulator(subpartitionId).append(record, dataType);
+    }
+
+    @Override
+    public void close() {
+        Arrays.stream(hashSubpartitionBufferAccumulators)
+                .forEach(HashSubpartitionBufferAccumulator::close);
+    }
+
+    @Override
+    public BufferBuilder requestBufferBlocking() {
+        return memoryManager.requestBufferBlocking(this);
+    }
+
+    @Override
+    public void flushAccumulatedBuffers(
+            TieredStorageSubpartitionId subpartitionId, List<Buffer> 
accumulatedBuffers) {
+        checkNotNull(accumulatedBufferFlusher).accept(subpartitionId, 
accumulatedBuffers);
+    }
+
+    private HashSubpartitionBufferAccumulator getSubpartitionAccumulator(
+            TieredStorageSubpartitionId subpartitionId) {
+        return 
hashSubpartitionBufferAccumulators[subpartitionId.getSubpartitionId()];
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/HashSubpartitionBufferAccumulator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/HashSubpartitionBufferAccumulator.java
new file mode 100644
index 00000000000..71e4a31adbf
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/HashSubpartitionBufferAccumulator.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.Optional;
+import java.util.Queue;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * {@link HashSubpartitionBufferAccumulator} accumulates the records in a 
subpartition.
+ *
+ * <p>Note that this class need not be thread-safe, because it should only be 
accessed from the main
+ * thread.
+ */
+public class HashSubpartitionBufferAccumulator {
+
+    private final TieredStorageSubpartitionId subpartitionId;
+
+    private final int bufferSize;
+
+    private final HashSubpartitionBufferAccumulatorContext 
bufferAccumulatorContext;
+
+    private final Queue<BufferBuilder> unfinishedBuffers = new LinkedList<>();
+
+    public HashSubpartitionBufferAccumulator(
+            TieredStorageSubpartitionId subpartitionId,
+            int bufferSize,
+            HashSubpartitionBufferAccumulatorContext bufferAccumulatorContext) 
{
+        this.subpartitionId = subpartitionId;
+        this.bufferSize = bufferSize;
+        this.bufferAccumulatorContext = bufferAccumulatorContext;
+    }
+
+    // ------------------------------------------------------------------------
+    //  Called by HashBufferAccumulator
+    // ------------------------------------------------------------------------
+
+    public void append(ByteBuffer record, Buffer.DataType dataType) throws 
IOException {
+        if (dataType.isEvent()) {
+            writeEvent(record, dataType);
+        } else {
+            writeRecord(record, dataType);
+        }
+    }
+
+    public void close() {
+        checkState(unfinishedBuffers.isEmpty(), "There are unfinished 
buffers.");
+    }
+
+    // ------------------------------------------------------------------------
+    //  Internal Methods
+    // ------------------------------------------------------------------------
+
+    private void writeEvent(ByteBuffer event, Buffer.DataType dataType) {
+        checkArgument(dataType.isEvent());
+
+        // Each event should take an exclusive buffer
+        finishCurrentWritingBufferIfNotEmpty();
+
+        // Store the events in the heap segments to improve network memory 
efficiency
+        MemorySegment data = MemorySegmentFactory.wrap(event.array());
+        flushFinishedBuffer(
+                new NetworkBuffer(data, FreeingBufferRecycler.INSTANCE, 
dataType, data.size()));
+    }
+
+    private void writeRecord(ByteBuffer record, Buffer.DataType dataType) {
+        checkArgument(!dataType.isEvent());
+
+        ensureCapacityForRecord(record);
+
+        writeRecord(record);
+    }
+
+    private void ensureCapacityForRecord(ByteBuffer record) {
+        final int numRecordBytes = record.remaining();
+        int availableBytes =
+                Optional.ofNullable(unfinishedBuffers.peek())
+                        .map(
+                                currentWritingBuffer ->
+                                        currentWritingBuffer.getWritableBytes()
+                                                + bufferSize * 
(unfinishedBuffers.size() - 1))
+                        .orElse(0);
+
+        while (availableBytes < numRecordBytes) {
+            BufferBuilder bufferBuilder = 
bufferAccumulatorContext.requestBufferBlocking();
+            unfinishedBuffers.add(bufferBuilder);
+            availableBytes += bufferSize;
+        }
+    }
+
+    private void writeRecord(ByteBuffer record) {
+        while (record.hasRemaining()) {
+            BufferBuilder currentWritingBuffer = 
checkNotNull(unfinishedBuffers.peek());
+            currentWritingBuffer.append(record);
+            if (currentWritingBuffer.isFull()) {
+                finishCurrentWritingBuffer();
+            }
+        }
+    }
+
+    private void finishCurrentWritingBufferIfNotEmpty() {
+        BufferBuilder currentWritingBuffer = unfinishedBuffers.peek();
+        if (currentWritingBuffer == null || 
currentWritingBuffer.getWritableBytes() == bufferSize) {
+            return;
+        }
+
+        finishCurrentWritingBuffer();
+    }
+
+    private void finishCurrentWritingBuffer() {
+        BufferBuilder currentWritingBuffer = unfinishedBuffers.poll();
+        if (currentWritingBuffer == null) {
+            return;
+        }
+        currentWritingBuffer.finish();
+        BufferConsumer bufferConsumer = 
currentWritingBuffer.createBufferConsumerFromBeginning();
+        Buffer buffer = bufferConsumer.build();
+        currentWritingBuffer.close();
+        bufferConsumer.close();
+        flushFinishedBuffer(buffer);
+    }
+
+    private void flushFinishedBuffer(Buffer finishedBuffer) {
+        bufferAccumulatorContext.flushAccumulatedBuffers(
+                subpartitionId, Collections.singletonList(finishedBuffer));
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/TestingBufferAccumulator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/HashSubpartitionBufferAccumulatorContext.java
similarity index 56%
copy from 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/TestingBufferAccumulator.java
copy to 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/HashSubpartitionBufferAccumulatorContext.java
index 9430f5f3b2e..11853b7d341 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/TestingBufferAccumulator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/HashSubpartitionBufferAccumulatorContext.java
@@ -16,30 +16,34 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.io.network.partition.hybrid.tiered;
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
 
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
-import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.BufferAccumulator;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.List;
-import java.util.function.BiConsumer;
 
-/** Test implementation for {@link BufferAccumulator}. */
-public class TestingBufferAccumulator implements BufferAccumulator {
-
-    @Override
-    public void setup(
-            int numSubpartitions,
-            BiConsumer<TieredStorageSubpartitionId, List<Buffer>> 
bufferFlusher) {}
+/**
+ * This interface is used by {@link HashSubpartitionBufferAccumulator} to 
operate {@link
+ * HashBufferAccumulator}.
+ */
+public interface HashSubpartitionBufferAccumulatorContext {
 
-    @Override
-    public void receive(
-            ByteBuffer record, TieredStorageSubpartitionId subpartitionId, 
Buffer.DataType dataType)
-            throws IOException {}
+    /**
+     * Request {@link BufferBuilder} from the {@link BufferPool}.
+     *
+     * @return the requested buffer
+     */
+    BufferBuilder requestBufferBlocking();
 
-    @Override
-    public void close() {}
+    /**
+     * Flush the accumulated {@link Buffer}s of the subpartition.
+     *
+     * @param subpartitionId the subpartition id
+     * @param accumulatedBuffers the accumulated buffers
+     */
+    void flushAccumulatedBuffers(
+            TieredStorageSubpartitionId subpartitionId, List<Buffer> 
accumulatedBuffers);
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java
index 34a0f03f1e1..9a87ec91ff0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java
@@ -54,7 +54,7 @@ public class TieredStorageProducerClient {
         this.bufferCompressor = bufferCompressor;
         this.tierProducerAgents = tierProducerAgents;
 
-        bufferAccumulator.setup(numSubpartitions, 
this::writeAccumulatedBuffers);
+        bufferAccumulator.setup(this::writeAccumulatedBuffers);
     }
 
     /**
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/TestingBufferAccumulator.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/TestingBufferAccumulator.java
index 9430f5f3b2e..adc9481f4b3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/TestingBufferAccumulator.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/TestingBufferAccumulator.java
@@ -31,9 +31,7 @@ import java.util.function.BiConsumer;
 public class TestingBufferAccumulator implements BufferAccumulator {
 
     @Override
-    public void setup(
-            int numSubpartitions,
-            BiConsumer<TieredStorageSubpartitionId, List<Buffer>> 
bufferFlusher) {}
+    public void setup(BiConsumer<TieredStorageSubpartitionId, List<Buffer>> 
bufferFlusher) {}
 
     @Override
     public void receive(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/HashBufferAccumulatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/HashBufferAccumulatorTest.java
new file mode 100644
index 00000000000..4cc8c3e3d16
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/HashBufferAccumulatorTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link HashBufferAccumulator}. */
+class HashBufferAccumulatorTest {
+
+    public static final int NUM_TOTAL_BUFFERS = 1000;
+
+    public static final int NETWORK_BUFFER_SIZE = 1024;
+
+    private static final float NUM_BUFFERS_TRIGGER_FLUSH_RATIO = 0.6f;
+
+    private NetworkBufferPool globalPool;
+
+    @BeforeEach
+    void before() {
+        globalPool = new NetworkBufferPool(NUM_TOTAL_BUFFERS, 
NETWORK_BUFFER_SIZE);
+    }
+
+    @AfterEach
+    void after() {
+        globalPool.destroy();
+    }
+
+    @Test
+    void testAccumulateRecordsAndGenerateFinishedBuffers() throws IOException {
+        int numBuffers = 10;
+        int numRecords = 1000;
+        TieredStorageSubpartitionId subpartitionId = new 
TieredStorageSubpartitionId(0);
+        Random random = new Random();
+
+        TieredStorageMemoryManager tieredStorageMemoryManager =
+                createStorageMemoryManager(numBuffers);
+        try (HashBufferAccumulator bufferAccumulator =
+                new HashBufferAccumulator(1, NETWORK_BUFFER_SIZE, 
tieredStorageMemoryManager)) {
+            AtomicInteger numReceivedFinishedBuffer = new AtomicInteger(0);
+            bufferAccumulator.setup(
+                    ((subpartition, buffers) ->
+                            buffers.forEach(
+                                    buffer -> {
+                                        
numReceivedFinishedBuffer.incrementAndGet();
+                                        buffer.recycleBuffer();
+                                    })));
+
+            int numRecordBytesSinceLastEvent = 0;
+            int numExpectBuffers = 0;
+            for (int i = 0; i < numRecords; i++) {
+                boolean isBuffer = random.nextBoolean() && i != numRecords - 1;
+                ByteBuffer record;
+                Buffer.DataType dataType =
+                        isBuffer ? Buffer.DataType.DATA_BUFFER : 
Buffer.DataType.EVENT_BUFFER;
+                if (isBuffer) {
+                    int numBytes = random.nextInt(2 * NETWORK_BUFFER_SIZE) + 1;
+                    numRecordBytesSinceLastEvent += numBytes;
+                    record = generateRandomData(numBytes, random);
+                } else {
+                    numExpectBuffers +=
+                            numRecordBytesSinceLastEvent / NETWORK_BUFFER_SIZE
+                                    + (numRecordBytesSinceLastEvent % 
NETWORK_BUFFER_SIZE == 0
+                                            ? 0
+                                            : 1);
+                    record = 
EventSerializer.toSerializedEvent(EndOfPartitionEvent.INSTANCE);
+                    numExpectBuffers++;
+                    numRecordBytesSinceLastEvent = 0;
+                }
+                bufferAccumulator.receive(record, subpartitionId, dataType);
+            }
+
+            
assertThat(numReceivedFinishedBuffer.get()).isEqualTo(numExpectBuffers);
+        }
+    }
+
+    @Test
+    void testEventShouldNotRequestBufferFromMemoryManager() throws IOException 
{
+        int numBuffers = 10;
+
+        TieredStorageMemoryManager tieredStorageMemoryManager =
+                createStorageMemoryManager(numBuffers);
+        try (HashBufferAccumulator bufferAccumulator =
+                new HashBufferAccumulator(1, NETWORK_BUFFER_SIZE, 
tieredStorageMemoryManager)) {
+            bufferAccumulator.setup(
+                    ((subpartition, buffers) -> 
buffers.forEach(Buffer::recycleBuffer)));
+
+            ByteBuffer endEvent = 
EventSerializer.toSerializedEvent(EndOfPartitionEvent.INSTANCE);
+            bufferAccumulator.receive(
+                    endEvent, new TieredStorageSubpartitionId(0), 
Buffer.DataType.EVENT_BUFFER);
+
+            
assertThat(tieredStorageMemoryManager.numOwnerRequestedBuffer(bufferAccumulator))
+                    .isZero();
+        }
+    }
+
+    @Test
+    void testCloseWithUnFinishedBuffers() throws IOException {
+        int numBuffers = 10;
+
+        TieredStorageMemoryManager tieredStorageMemoryManager =
+                createStorageMemoryManager(numBuffers);
+        assertThatThrownBy(
+                        () -> {
+                            try (HashBufferAccumulator bufferAccumulator =
+                                    new HashBufferAccumulator(
+                                            1, NETWORK_BUFFER_SIZE, 
tieredStorageMemoryManager)) {
+                                bufferAccumulator.setup(
+                                        ((subpartition, buffers) ->
+                                                
buffers.forEach(Buffer::recycleBuffer)));
+                                bufferAccumulator.receive(
+                                        generateRandomData(1, new Random()),
+                                        new TieredStorageSubpartitionId(0),
+                                        Buffer.DataType.DATA_BUFFER);
+                            }
+                        })
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessageContaining("There are unfinished buffers");
+    }
+
+    private TieredStorageMemoryManagerImpl createStorageMemoryManager(int 
numBuffersInBufferPool)
+            throws IOException {
+        BufferPool bufferPool =
+                globalPool.createBufferPool(numBuffersInBufferPool, 
numBuffersInBufferPool);
+        TieredStorageMemoryManagerImpl storageMemoryManager =
+                new 
TieredStorageMemoryManagerImpl(NUM_BUFFERS_TRIGGER_FLUSH_RATIO, true);
+        storageMemoryManager.setup(
+                bufferPool, Collections.singletonList(new 
TieredStorageMemorySpec(this, 1)));
+        return storageMemoryManager;
+    }
+
+    private static ByteBuffer generateRandomData(int dataSize, Random random) {
+        byte[] dataWritten = new byte[dataSize];
+        random.nextBytes(dataWritten);
+        return ByteBuffer.wrap(dataWritten);
+    }
+}

Reply via email to