This is an automated email from the ASF dual-hosted git repository.
guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 9880ba5324d [FLINK-31637][network] Implement the HashBufferAccumulator
for the tiered storage
9880ba5324d is described below
commit 9880ba5324d4a1252d6ae1a3f0f061e4469a05ac
Author: Yuxin Tan <[email protected]>
AuthorDate: Wed May 10 13:58:00 2023 +0800
[FLINK-31637][network] Implement the HashBufferAccumulator for the tiered
storage
This closes #22501
---
.../hybrid/tiered/storage/BufferAccumulator.java | 7 +-
.../tiered/storage/HashBufferAccumulator.java | 110 +++++++++++++
.../storage/HashSubpartitionBufferAccumulator.java | 159 +++++++++++++++++++
.../HashSubpartitionBufferAccumulatorContext.java} | 40 ++---
.../storage/TieredStorageProducerClient.java | 2 +-
.../hybrid/tiered/TestingBufferAccumulator.java | 4 +-
.../tiered/storage/HashBufferAccumulatorTest.java | 170 +++++++++++++++++++++
7 files changed, 465 insertions(+), 27 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/BufferAccumulator.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/BufferAccumulator.java
index e57f8383051..762aae35bdc 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/BufferAccumulator.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/BufferAccumulator.java
@@ -30,19 +30,16 @@ import java.util.function.BiConsumer;
* Accumulates received records into buffers. The {@link BufferAccumulator}
receives the records
* from tiered store producer and the records will accumulate and transform
into buffers.
*/
-public interface BufferAccumulator {
+public interface BufferAccumulator extends AutoCloseable {
/**
* Setup the accumulator.
*
- * @param numSubpartitions number of subpartitions
* @param bufferFlusher accepts the accumulated buffers. The first field
is the subpartition id,
* while the list in the second field contains accumulated buffers in
order for that
* subpartition.
*/
- void setup(
- int numSubpartitions,
- BiConsumer<TieredStorageSubpartitionId, List<Buffer>>
bufferFlusher);
+ void setup(BiConsumer<TieredStorageSubpartitionId, List<Buffer>>
bufferFlusher);
/**
* Receives the records from tiered store producer, these records will be
accumulated and
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/HashBufferAccumulator.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/HashBufferAccumulator.java
new file mode 100644
index 00000000000..6f87c423a5c
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/HashBufferAccumulator.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.function.BiConsumer;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The hash implementation of the {@link BufferAccumulator}. The {@link
BufferAccumulator} receives
+ * the records from {@link TieredStorageProducerClient} and the records will
accumulate and
+ * transform to finished buffers. The accumulated buffers will be transferred
to the corresponding
+ * tier dynamically.
+ *
+ * <p>To avoid the buffer waiting deadlock between the subpartitions, the
{@link
+ * HashBufferAccumulator} requires at least n buffers (n is the number of
subpartitions) to make
+ * sure that each subpartition has at least one buffer to accumulate the
receiving data. Once an
+ * accumulated buffer is finished, the buffer will be flushed immediately.
+ *
+ * <p>Note that this class need not be thread-safe, because it should only be
accessed from the main
+ * thread.
+ */
+public class HashBufferAccumulator
+ implements BufferAccumulator, HashSubpartitionBufferAccumulatorContext
{
+
+ private final TieredStorageMemoryManager memoryManager;
+
+ private final HashSubpartitionBufferAccumulator[]
hashSubpartitionBufferAccumulators;
+
+ /**
+ * The {@link HashBufferAccumulator}'s accumulated buffer flusher is not
prepared during
+ * construction, requiring the field to be initialized during setup.
Therefore, it is necessary
+ * to verify whether this field is null before using it.
+ */
+ @Nullable
+ private BiConsumer<TieredStorageSubpartitionId, List<Buffer>>
accumulatedBufferFlusher;
+
+ public HashBufferAccumulator(
+ int numSubpartitions, int bufferSize, TieredStorageMemoryManager
memoryManager) {
+ this.memoryManager = memoryManager;
+ this.hashSubpartitionBufferAccumulators =
+ new HashSubpartitionBufferAccumulator[numSubpartitions];
+ for (int i = 0; i < numSubpartitions; i++) {
+ hashSubpartitionBufferAccumulators[i] =
+ new HashSubpartitionBufferAccumulator(
+ new TieredStorageSubpartitionId(i), bufferSize,
this);
+ }
+ }
+
+ @Override
+ public void setup(
+ BiConsumer<TieredStorageSubpartitionId, List<Buffer>>
accumulatedBufferFlusher) {
+ this.accumulatedBufferFlusher = accumulatedBufferFlusher;
+ }
+
+ @Override
+ public void receive(
+ ByteBuffer record, TieredStorageSubpartitionId subpartitionId,
Buffer.DataType dataType)
+ throws IOException {
+ getSubpartitionAccumulator(subpartitionId).append(record, dataType);
+ }
+
+ @Override
+ public void close() {
+ Arrays.stream(hashSubpartitionBufferAccumulators)
+ .forEach(HashSubpartitionBufferAccumulator::close);
+ }
+
+ @Override
+ public BufferBuilder requestBufferBlocking() {
+ return memoryManager.requestBufferBlocking(this);
+ }
+
+ @Override
+ public void flushAccumulatedBuffers(
+ TieredStorageSubpartitionId subpartitionId, List<Buffer>
accumulatedBuffers) {
+ checkNotNull(accumulatedBufferFlusher).accept(subpartitionId,
accumulatedBuffers);
+ }
+
+ private HashSubpartitionBufferAccumulator getSubpartitionAccumulator(
+ TieredStorageSubpartitionId subpartitionId) {
+ return
hashSubpartitionBufferAccumulators[subpartitionId.getSubpartitionId()];
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/HashSubpartitionBufferAccumulator.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/HashSubpartitionBufferAccumulator.java
new file mode 100644
index 00000000000..71e4a31adbf
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/HashSubpartitionBufferAccumulator.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.Optional;
+import java.util.Queue;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * {@link HashSubpartitionBufferAccumulator} accumulates the records in a
subpartition.
+ *
+ * <p>Note that this class need not be thread-safe, because it should only be
accessed from the main
+ * thread.
+ */
+public class HashSubpartitionBufferAccumulator {
+
+ private final TieredStorageSubpartitionId subpartitionId;
+
+ private final int bufferSize;
+
+ private final HashSubpartitionBufferAccumulatorContext
bufferAccumulatorContext;
+
+ private final Queue<BufferBuilder> unfinishedBuffers = new LinkedList<>();
+
+ public HashSubpartitionBufferAccumulator(
+ TieredStorageSubpartitionId subpartitionId,
+ int bufferSize,
+ HashSubpartitionBufferAccumulatorContext bufferAccumulatorContext)
{
+ this.subpartitionId = subpartitionId;
+ this.bufferSize = bufferSize;
+ this.bufferAccumulatorContext = bufferAccumulatorContext;
+ }
+
+ // ------------------------------------------------------------------------
+ // Called by HashBufferAccumulator
+ // ------------------------------------------------------------------------
+
+ public void append(ByteBuffer record, Buffer.DataType dataType) throws
IOException {
+ if (dataType.isEvent()) {
+ writeEvent(record, dataType);
+ } else {
+ writeRecord(record, dataType);
+ }
+ }
+
+ public void close() {
+ checkState(unfinishedBuffers.isEmpty(), "There are unfinished
buffers.");
+ }
+
+ // ------------------------------------------------------------------------
+ // Internal Methods
+ // ------------------------------------------------------------------------
+
+ private void writeEvent(ByteBuffer event, Buffer.DataType dataType) {
+ checkArgument(dataType.isEvent());
+
+ // Each event should take an exclusive buffer
+ finishCurrentWritingBufferIfNotEmpty();
+
+ // Store the events in the heap segments to improve network memory
efficiency
+ MemorySegment data = MemorySegmentFactory.wrap(event.array());
+ flushFinishedBuffer(
+ new NetworkBuffer(data, FreeingBufferRecycler.INSTANCE,
dataType, data.size()));
+ }
+
+ private void writeRecord(ByteBuffer record, Buffer.DataType dataType) {
+ checkArgument(!dataType.isEvent());
+
+ ensureCapacityForRecord(record);
+
+ writeRecord(record);
+ }
+
+ private void ensureCapacityForRecord(ByteBuffer record) {
+ final int numRecordBytes = record.remaining();
+ int availableBytes =
+ Optional.ofNullable(unfinishedBuffers.peek())
+ .map(
+ currentWritingBuffer ->
+ currentWritingBuffer.getWritableBytes()
+ + bufferSize *
(unfinishedBuffers.size() - 1))
+ .orElse(0);
+
+ while (availableBytes < numRecordBytes) {
+ BufferBuilder bufferBuilder =
bufferAccumulatorContext.requestBufferBlocking();
+ unfinishedBuffers.add(bufferBuilder);
+ availableBytes += bufferSize;
+ }
+ }
+
+ private void writeRecord(ByteBuffer record) {
+ while (record.hasRemaining()) {
+ BufferBuilder currentWritingBuffer =
checkNotNull(unfinishedBuffers.peek());
+ currentWritingBuffer.append(record);
+ if (currentWritingBuffer.isFull()) {
+ finishCurrentWritingBuffer();
+ }
+ }
+ }
+
+ private void finishCurrentWritingBufferIfNotEmpty() {
+ BufferBuilder currentWritingBuffer = unfinishedBuffers.peek();
+ if (currentWritingBuffer == null ||
currentWritingBuffer.getWritableBytes() == bufferSize) {
+ return;
+ }
+
+ finishCurrentWritingBuffer();
+ }
+
+ private void finishCurrentWritingBuffer() {
+ BufferBuilder currentWritingBuffer = unfinishedBuffers.poll();
+ if (currentWritingBuffer == null) {
+ return;
+ }
+ currentWritingBuffer.finish();
+ BufferConsumer bufferConsumer =
currentWritingBuffer.createBufferConsumerFromBeginning();
+ Buffer buffer = bufferConsumer.build();
+ currentWritingBuffer.close();
+ bufferConsumer.close();
+ flushFinishedBuffer(buffer);
+ }
+
+ private void flushFinishedBuffer(Buffer finishedBuffer) {
+ bufferAccumulatorContext.flushAccumulatedBuffers(
+ subpartitionId, Collections.singletonList(finishedBuffer));
+ }
+}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/TestingBufferAccumulator.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/HashSubpartitionBufferAccumulatorContext.java
similarity index 56%
copy from
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/TestingBufferAccumulator.java
copy to
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/HashSubpartitionBufferAccumulatorContext.java
index 9430f5f3b2e..11853b7d341 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/TestingBufferAccumulator.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/HashSubpartitionBufferAccumulatorContext.java
@@ -16,30 +16,34 @@
* limitations under the License.
*/
-package org.apache.flink.runtime.io.network.partition.hybrid.tiered;
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
-import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.BufferAccumulator;
-import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.List;
-import java.util.function.BiConsumer;
-/** Test implementation for {@link BufferAccumulator}. */
-public class TestingBufferAccumulator implements BufferAccumulator {
-
- @Override
- public void setup(
- int numSubpartitions,
- BiConsumer<TieredStorageSubpartitionId, List<Buffer>>
bufferFlusher) {}
+/**
+ * This interface is used by {@link HashSubpartitionBufferAccumulator} to
operate {@link
+ * HashBufferAccumulator}.
+ */
+public interface HashSubpartitionBufferAccumulatorContext {
- @Override
- public void receive(
- ByteBuffer record, TieredStorageSubpartitionId subpartitionId,
Buffer.DataType dataType)
- throws IOException {}
+ /**
+ * Request {@link BufferBuilder} from the {@link BufferPool}.
+ *
+ * @return the requested buffer
+ */
+ BufferBuilder requestBufferBlocking();
- @Override
- public void close() {}
+ /**
+ * Flush the accumulated {@link Buffer}s of the subpartition.
+ *
+ * @param subpartitionId the subpartition id
+ * @param accumulatedBuffers the accumulated buffers
+ */
+ void flushAccumulatedBuffers(
+ TieredStorageSubpartitionId subpartitionId, List<Buffer>
accumulatedBuffers);
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java
index 34a0f03f1e1..9a87ec91ff0 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java
@@ -54,7 +54,7 @@ public class TieredStorageProducerClient {
this.bufferCompressor = bufferCompressor;
this.tierProducerAgents = tierProducerAgents;
- bufferAccumulator.setup(numSubpartitions,
this::writeAccumulatedBuffers);
+ bufferAccumulator.setup(this::writeAccumulatedBuffers);
}
/**
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/TestingBufferAccumulator.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/TestingBufferAccumulator.java
index 9430f5f3b2e..adc9481f4b3 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/TestingBufferAccumulator.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/TestingBufferAccumulator.java
@@ -31,9 +31,7 @@ import java.util.function.BiConsumer;
public class TestingBufferAccumulator implements BufferAccumulator {
@Override
- public void setup(
- int numSubpartitions,
- BiConsumer<TieredStorageSubpartitionId, List<Buffer>>
bufferFlusher) {}
+ public void setup(BiConsumer<TieredStorageSubpartitionId, List<Buffer>>
bufferFlusher) {}
@Override
public void receive(
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/HashBufferAccumulatorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/HashBufferAccumulatorTest.java
new file mode 100644
index 00000000000..4cc8c3e3d16
--- /dev/null
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/HashBufferAccumulatorTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link HashBufferAccumulator}. */
+class HashBufferAccumulatorTest {
+
+ public static final int NUM_TOTAL_BUFFERS = 1000;
+
+ public static final int NETWORK_BUFFER_SIZE = 1024;
+
+ private static final float NUM_BUFFERS_TRIGGER_FLUSH_RATIO = 0.6f;
+
+ private NetworkBufferPool globalPool;
+
+ @BeforeEach
+ void before() {
+ globalPool = new NetworkBufferPool(NUM_TOTAL_BUFFERS,
NETWORK_BUFFER_SIZE);
+ }
+
+ @AfterEach
+ void after() {
+ globalPool.destroy();
+ }
+
+ @Test
+ void testAccumulateRecordsAndGenerateFinishedBuffers() throws IOException {
+ int numBuffers = 10;
+ int numRecords = 1000;
+ TieredStorageSubpartitionId subpartitionId = new
TieredStorageSubpartitionId(0);
+ Random random = new Random();
+
+ TieredStorageMemoryManager tieredStorageMemoryManager =
+ createStorageMemoryManager(numBuffers);
+ try (HashBufferAccumulator bufferAccumulator =
+ new HashBufferAccumulator(1, NETWORK_BUFFER_SIZE,
tieredStorageMemoryManager)) {
+ AtomicInteger numReceivedFinishedBuffer = new AtomicInteger(0);
+ bufferAccumulator.setup(
+ ((subpartition, buffers) ->
+ buffers.forEach(
+ buffer -> {
+
numReceivedFinishedBuffer.incrementAndGet();
+ buffer.recycleBuffer();
+ })));
+
+ int numRecordBytesSinceLastEvent = 0;
+ int numExpectBuffers = 0;
+ for (int i = 0; i < numRecords; i++) {
+ boolean isBuffer = random.nextBoolean() && i != numRecords - 1;
+ ByteBuffer record;
+ Buffer.DataType dataType =
+ isBuffer ? Buffer.DataType.DATA_BUFFER :
Buffer.DataType.EVENT_BUFFER;
+ if (isBuffer) {
+ int numBytes = random.nextInt(2 * NETWORK_BUFFER_SIZE) + 1;
+ numRecordBytesSinceLastEvent += numBytes;
+ record = generateRandomData(numBytes, random);
+ } else {
+ numExpectBuffers +=
+ numRecordBytesSinceLastEvent / NETWORK_BUFFER_SIZE
+ + (numRecordBytesSinceLastEvent %
NETWORK_BUFFER_SIZE == 0
+ ? 0
+ : 1);
+ record =
EventSerializer.toSerializedEvent(EndOfPartitionEvent.INSTANCE);
+ numExpectBuffers++;
+ numRecordBytesSinceLastEvent = 0;
+ }
+ bufferAccumulator.receive(record, subpartitionId, dataType);
+ }
+
+
assertThat(numReceivedFinishedBuffer.get()).isEqualTo(numExpectBuffers);
+ }
+ }
+
+ @Test
+ void testEventShouldNotRequestBufferFromMemoryManager() throws IOException
{
+ int numBuffers = 10;
+
+ TieredStorageMemoryManager tieredStorageMemoryManager =
+ createStorageMemoryManager(numBuffers);
+ try (HashBufferAccumulator bufferAccumulator =
+ new HashBufferAccumulator(1, NETWORK_BUFFER_SIZE,
tieredStorageMemoryManager)) {
+ bufferAccumulator.setup(
+ ((subpartition, buffers) ->
buffers.forEach(Buffer::recycleBuffer)));
+
+ ByteBuffer endEvent =
EventSerializer.toSerializedEvent(EndOfPartitionEvent.INSTANCE);
+ bufferAccumulator.receive(
+ endEvent, new TieredStorageSubpartitionId(0),
Buffer.DataType.EVENT_BUFFER);
+
+
assertThat(tieredStorageMemoryManager.numOwnerRequestedBuffer(bufferAccumulator))
+ .isZero();
+ }
+ }
+
+ @Test
+ void testCloseWithUnFinishedBuffers() throws IOException {
+ int numBuffers = 10;
+
+ TieredStorageMemoryManager tieredStorageMemoryManager =
+ createStorageMemoryManager(numBuffers);
+ assertThatThrownBy(
+ () -> {
+ try (HashBufferAccumulator bufferAccumulator =
+ new HashBufferAccumulator(
+ 1, NETWORK_BUFFER_SIZE,
tieredStorageMemoryManager)) {
+ bufferAccumulator.setup(
+ ((subpartition, buffers) ->
+
buffers.forEach(Buffer::recycleBuffer)));
+ bufferAccumulator.receive(
+ generateRandomData(1, new Random()),
+ new TieredStorageSubpartitionId(0),
+ Buffer.DataType.DATA_BUFFER);
+ }
+ })
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageContaining("There are unfinished buffers");
+ }
+
+ private TieredStorageMemoryManagerImpl createStorageMemoryManager(int
numBuffersInBufferPool)
+ throws IOException {
+ BufferPool bufferPool =
+ globalPool.createBufferPool(numBuffersInBufferPool,
numBuffersInBufferPool);
+ TieredStorageMemoryManagerImpl storageMemoryManager =
+ new
TieredStorageMemoryManagerImpl(NUM_BUFFERS_TRIGGER_FLUSH_RATIO, true);
+ storageMemoryManager.setup(
+ bufferPool, Collections.singletonList(new
TieredStorageMemorySpec(this, 1)));
+ return storageMemoryManager;
+ }
+
+ private static ByteBuffer generateRandomData(int dataSize, Random random) {
+ byte[] dataWritten = new byte[dataSize];
+ random.nextBytes(dataWritten);
+ return ByteBuffer.wrap(dataWritten);
+ }
+}