This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a2b402d66de9ecde75881796a6d7ee09fc4d6963
Author: Weijie Guo <[email protected]>
AuthorDate: Mon Oct 24 18:16:01 2022 +0800

    [FLINK-28889] Introduce HsSubpartitionConsumerMemoryDataManager and let 
HsMemoryDataManager supports multiple consumer.
---
 .../io/network/partition/hybrid/HsDataView.java    |   4 +-
 .../partition/hybrid/HsFileDataManager.java        |   3 +-
 .../partition/hybrid/HsMemoryDataManager.java      |  60 ++++---
 .../hybrid/HsMemoryDataManagerOperation.java       |  15 +-
 .../partition/hybrid/HsResultPartition.java        |   6 +-
 .../hybrid/HsSelectiveSpillingStrategy.java        |   3 +-
 .../partition/hybrid/HsSpillingInfoProvider.java   |  10 +-
 ...titionView.java => HsSubpartitionConsumer.java} |  20 ++-
 ... HsSubpartitionConsumerInternalOperations.java} |   6 +-
 .../HsSubpartitionConsumerMemoryDataManager.java   | 185 +++++++++++++++++++
 .../partition/hybrid/HsSubpartitionFileReader.java |   2 +-
 .../hybrid/HsSubpartitionFileReaderImpl.java       |   6 +-
 .../hybrid/HsSubpartitionMemoryDataManager.java    | 173 ++++++------------
 .../partition/hybrid/HsFileDataManagerTest.java    |  10 +-
 .../partition/hybrid/HsMemoryDataManagerTest.java  |  20 +++
 ...sSubpartitionConsumerMemoryDataManagerTest.java | 197 +++++++++++++++++++++
 .../hybrid/HsSubpartitionFileReaderImplTest.java   |  44 ++---
 .../HsSubpartitionMemoryDataManagerTest.java       | 132 ++++----------
 .../partition/hybrid/HsSubpartitionViewTest.java   |  60 ++++---
 .../hybrid/TestingMemoryDataManagerOperation.java  |  26 ++-
 .../hybrid/TestingSpillingInfoProvider.java        |   2 +-
 ...tingSubpartitionConsumerInternalOperation.java} |   6 +-
 22 files changed, 669 insertions(+), 321 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsDataView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsDataView.java
index c3d4cb9ed83..b8888cfcb38 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsDataView.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsDataView.java
@@ -24,8 +24,8 @@ import 
org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAn
 import java.util.Optional;
 
 /**
- * A view for {@link HsSubpartitionView} to find out what data exists in 
memory or disk and polling
- * the data.
+ * A view for {@link HsSubpartitionConsumer} to find out what data exists in 
memory or disk and
+ * polling the data.
  */
 public interface HsDataView {
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java
index 75d322f6452..82db961be9c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java
@@ -153,7 +153,8 @@ public class HsFileDataManager implements Runnable, 
BufferRecycler {
 
     /** This method only called by result partition to create 
subpartitionFileReader. */
     public HsDataView registerNewSubpartition(
-            int subpartitionId, HsSubpartitionViewInternalOperations 
operation) throws IOException {
+            int subpartitionId, HsSubpartitionConsumerInternalOperations 
operation)
+            throws IOException {
         synchronized (lock) {
             checkState(!isReleased, "HsFileDataManager is already released.");
             lazyInitialize();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java
index a8d7d89e7e4..da0c3962ad2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategy.Decision;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 import org.apache.flink.util.concurrent.FutureUtils;
 import org.apache.flink.util.function.SupplierWithException;
@@ -35,6 +36,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.file.Path;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Deque;
 import java.util.List;
 import java.util.Map;
@@ -72,8 +74,12 @@ public class HsMemoryDataManager implements 
HsSpillingInfoProvider, HsMemoryData
 
     private final AtomicInteger numUnSpillBuffers = new AtomicInteger(0);
 
-    private final Map<Integer, HsSubpartitionViewInternalOperations> 
subpartitionViewOperationsMap =
-            new ConcurrentHashMap<>();
+    /**
+     * Each element of the list is all views of the subpartition corresponding 
to its index, which
+     * are stored in the form of a map that maps consumer id to its 
subpartition view.
+     */
+    private final List<Map<HsConsumerId, 
HsSubpartitionConsumerInternalOperations>>
+            subpartitionViewOperationsMap;
 
     /**
      * Currently, it is only used to regularly check the actual size of local 
buffer pool (the size
@@ -106,6 +112,7 @@ public class HsMemoryDataManager implements 
HsSpillingInfoProvider, HsMemoryData
         ReadWriteLock readWriteLock = new ReentrantReadWriteLock(true);
         this.lock = readWriteLock.writeLock();
 
+        this.subpartitionViewOperationsMap = new ArrayList<>(numSubpartitions);
         for (int subpartitionId = 0; subpartitionId < numSubpartitions; 
++subpartitionId) {
             subpartitionMemoryDataManagers[subpartitionId] =
                     new HsSubpartitionMemoryDataManager(
@@ -114,6 +121,7 @@ public class HsMemoryDataManager implements 
HsSpillingInfoProvider, HsMemoryData
                             readWriteLock.readLock(),
                             bufferCompressor,
                             this);
+            subpartitionViewOperationsMap.add(new ConcurrentHashMap<>());
         }
 
         poolSize = new AtomicInteger(this.bufferPool.getNumBuffers());
@@ -156,20 +164,19 @@ public class HsMemoryDataManager implements 
HsSpillingInfoProvider, HsMemoryData
     }
 
     /**
-     * Register {@link HsSubpartitionViewInternalOperations} to {@link
+     * Register {@link HsSubpartitionConsumerInternalOperations} to {@link
      * #subpartitionViewOperationsMap}. It is used to obtain the consumption 
progress of the
      * subpartition.
      */
-    public HsDataView registerSubpartitionView(
-            int subpartitionId, HsSubpartitionViewInternalOperations 
viewOperations) {
-        HsSubpartitionViewInternalOperations oldView =
-                subpartitionViewOperationsMap.put(subpartitionId, 
viewOperations);
-        if (oldView != null) {
-            LOG.debug(
-                    "subpartition : {} register subpartition view will replace 
old view. ",
-                    subpartitionId);
-        }
-        return getSubpartitionMemoryDataManager(subpartitionId);
+    public HsDataView registerNewConsumer(
+            int subpartitionId,
+            HsConsumerId consumerId,
+            HsSubpartitionConsumerInternalOperations viewOperations) {
+        HsSubpartitionConsumerInternalOperations oldView =
+                
subpartitionViewOperationsMap.get(subpartitionId).put(consumerId, 
viewOperations);
+        Preconditions.checkState(
+                oldView == null, "Each subpartition view should have unique 
consumerId.");
+        return 
getSubpartitionMemoryDataManager(subpartitionId).registerNewConsumer(consumerId);
     }
 
     /** Close this {@link HsMemoryDataManager}, it means no data can append to 
memory. */
@@ -232,11 +239,11 @@ public class HsMemoryDataManager implements 
HsSpillingInfoProvider, HsMemoryData
 
     // Write lock should be acquired before invoke this method.
     @Override
-    public List<Integer> getNextBufferIndexToConsume() {
+    public List<Integer> getNextBufferIndexToConsume(HsConsumerId consumerId) {
         ArrayList<Integer> consumeIndexes = new ArrayList<>(numSubpartitions);
         for (int channel = 0; channel < numSubpartitions; channel++) {
-            HsSubpartitionViewInternalOperations viewOperation =
-                    subpartitionViewOperationsMap.get(channel);
+            HsSubpartitionConsumerInternalOperations viewOperation =
+                    subpartitionViewOperationsMap.get(channel).get(consumerId);
             // Access consuming offset without lock to prevent deadlock.
             // A consuming thread may being blocked on the memory data manager 
lock, while holding
             // the viewOperation lock.
@@ -280,12 +287,23 @@ public class HsMemoryDataManager implements 
HsSpillingInfoProvider, HsMemoryData
     }
 
     @Override
-    public void onDataAvailable(int subpartitionId) {
-        HsSubpartitionViewInternalOperations 
subpartitionViewInternalOperations =
+    public void onDataAvailable(int subpartitionId, Collection<HsConsumerId> 
consumerIds) {
+        Map<HsConsumerId, HsSubpartitionConsumerInternalOperations> 
consumerViewMap =
                 subpartitionViewOperationsMap.get(subpartitionId);
-        if (subpartitionViewInternalOperations != null) {
-            subpartitionViewInternalOperations.notifyDataAvailable();
-        }
+        consumerIds.forEach(
+                consumerId -> {
+                    HsSubpartitionConsumerInternalOperations consumerView =
+                            consumerViewMap.get(consumerId);
+                    if (consumerView != null) {
+                        consumerView.notifyDataAvailable();
+                    }
+                });
+    }
+
+    @Override
+    public void onConsumerReleased(int subpartitionId, HsConsumerId 
consumerId) {
+        subpartitionViewOperationsMap.get(subpartitionId).remove(consumerId);
+        
getSubpartitionMemoryDataManager(subpartitionId).releaseConsumer(consumerId);
     }
 
     // ------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManagerOperation.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManagerOperation.java
index 37df7709d63..794e95b98ea 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManagerOperation.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManagerOperation.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.io.network.partition.hybrid;
 
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 
+import java.util.Collection;
+
 /**
  * This interface is used by {@link HsSubpartitionMemoryDataManager} to 
operate {@link
  * HsMemoryDataManager}. Spilling decision may be made and handled inside 
these operations.
@@ -53,7 +55,16 @@ public interface HsMemoryDataManagerOperation {
     /**
      * This method is called when subpartition data become available.
      *
-     * @param subpartitionId the subpartition need notify data available.
+     * @param subpartitionId the subpartition's identifier that this consumer 
belongs to.
+     * @param consumerIds the consumer's identifier which need notify data 
available.
+     */
+    void onDataAvailable(int subpartitionId, Collection<HsConsumerId> 
consumerIds);
+
+    /**
+     * This method is called when consumer is decided to released.
+     *
+     * @param subpartitionId the subpartition's identifier that this consumer 
belongs to.
+     * @param consumerId the consumer's identifier which decided to be 
released.
      */
-    void onDataAvailable(int subpartitionId);
+    void onConsumerReleased(int subpartitionId, HsConsumerId consumerId);
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java
index 9b75f1e3724..5e10b773397 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java
@@ -186,13 +186,15 @@ public class HsResultPartition extends ResultPartition {
             throw new PartitionNotFoundException(getPartitionId());
         }
 
-        HsSubpartitionView subpartitionView = new 
HsSubpartitionView(availabilityListener);
+        HsSubpartitionConsumer subpartitionView = new 
HsSubpartitionConsumer(availabilityListener);
         HsDataView diskDataView =
                 fileDataManager.registerNewSubpartition(subpartitionId, 
subpartitionView);
 
         HsDataView memoryDataView =
                 checkNotNull(memoryDataManager)
-                        .registerSubpartitionView(subpartitionId, 
subpartitionView);
+                        // TODO pass real consumerId in the next commit.
+                        .registerNewConsumer(
+                                subpartitionId, HsConsumerId.DEFAULT, 
subpartitionView);
 
         subpartitionView.setDiskDataView(diskDataView);
         subpartitionView.setMemoryDataView(memoryDataView);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategy.java
index 8c7217a4b93..56cf0a39f4a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategy.java
@@ -95,7 +95,8 @@ public class HsSelectiveSpillingStrategy implements 
HsSpillingStrategy {
 
         TreeMap<Integer, List<BufferIndexAndChannel>> 
subpartitionToHighPriorityBuffers =
                 getBuffersByConsumptionPriorityInOrder(
-                        spillingInfoProvider.getNextBufferIndexToConsume(),
+                        // selective spilling strategy does not support 
multiple consumer.
+                        
spillingInfoProvider.getNextBufferIndexToConsume(HsConsumerId.DEFAULT),
                         subpartitionToBuffers,
                         spillNum);
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingInfoProvider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingInfoProvider.java
index deb32e7058e..7c6b0ccd70d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingInfoProvider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingInfoProvider.java
@@ -31,12 +31,14 @@ public interface HsSpillingInfoProvider {
     int getNumSubpartitions();
 
     /**
-     * Get all downstream next buffer index to consume.
+     * Get all subpartition's next buffer index to consume of specific 
consumer.
      *
-     * @return A list containing all downstream next buffer index to consume, 
if the downstream
-     *     subpartition view has not been registered, the corresponding return 
value is -1.
+     * @param consumerId of the target downstream consumer.
+     * @return A list containing all subpartition's next buffer index to 
consume of specific
+     *     consumer, if the downstream subpartition view has not been 
registered, the corresponding
+     *     return value is -1.
      */
-    List<Integer> getNextBufferIndexToConsume();
+    List<Integer> getNextBufferIndexToConsume(HsConsumerId consumerId);
 
     /**
      * Get all buffers with the expected status from the subpartition.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionConsumer.java
similarity index 94%
rename from 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionView.java
rename to 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionConsumer.java
index bc9ec5a1744..7878d68bc33 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionView.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionConsumer.java
@@ -33,8 +33,8 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
 /** The read view of HsResultPartition, data can be read from memory or disk. 
*/
-public class HsSubpartitionView
-        implements ResultSubpartitionView, 
HsSubpartitionViewInternalOperations {
+public class HsSubpartitionConsumer
+        implements ResultSubpartitionView, 
HsSubpartitionConsumerInternalOperations {
     private final BufferAvailabilityListener availabilityListener;
     private final Object lock = new Object();
 
@@ -66,7 +66,7 @@ public class HsSubpartitionView
     // memoryDataView can be null only before initialization.
     private HsDataView memoryDataView;
 
-    public HsSubpartitionView(BufferAvailabilityListener availabilityListener) 
{
+    public HsSubpartitionConsumer(BufferAvailabilityListener 
availabilityListener) {
         this.availabilityListener = availabilityListener;
     }
 
@@ -270,21 +270,25 @@ public class HsSubpartitionView
     }
 
     private void releaseInternal(@Nullable Throwable throwable) {
-        boolean releaseSubpartitionReader = false;
+        boolean releaseDiskView;
+        boolean releaseMemoryView;
         synchronized (lock) {
             if (isReleased) {
                 return;
             }
             isReleased = true;
             failureCause = throwable;
-            if (diskDataView != null) {
-                releaseSubpartitionReader = true;
-            }
+            releaseDiskView = diskDataView != null;
+            releaseMemoryView = memoryDataView != null;
         }
         // release subpartition reader outside of lock to avoid deadlock.
-        if (releaseSubpartitionReader) {
+        if (releaseDiskView) {
             //noinspection FieldAccessNotGuarded
             diskDataView.releaseDataView();
         }
+        if (releaseMemoryView) {
+            //noinspection FieldAccessNotGuarded
+            memoryDataView.releaseDataView();
+        }
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewInternalOperations.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionConsumerInternalOperations.java
similarity index 87%
rename from 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewInternalOperations.java
rename to 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionConsumerInternalOperations.java
index ab967e3d450..3f2e58507f3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewInternalOperations.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionConsumerInternalOperations.java
@@ -19,10 +19,10 @@
 package org.apache.flink.runtime.io.network.partition.hybrid;
 
 /**
- * Operations provided by HsSubpartitionView that are used by other internal 
components of hybrid
- * result partition.
+ * Operations provided by {@link HsSubpartitionConsumer} that are used by 
other internal components
+ * of hybrid result partition.
  */
-public interface HsSubpartitionViewInternalOperations {
+public interface HsSubpartitionConsumerInternalOperations {
 
     /** Callback for new data become available. */
     void notifyDataAvailable();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionConsumerMemoryDataManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionConsumerMemoryDataManager.java
new file mode 100644
index 00000000000..606ffa32bbf
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionConsumerMemoryDataManager.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
+import org.apache.flink.util.function.SupplierWithException;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.Optional;
+import java.util.concurrent.locks.Lock;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class is responsible for managing the data of a single consumer. {@link
+ * HsSubpartitionMemoryDataManager} will create a new {@link
+ * HsSubpartitionConsumerMemoryDataManager} when a consumer is registered.
+ */
+public class HsSubpartitionConsumerMemoryDataManager implements HsDataView {
+
+    @GuardedBy("consumerLock")
+    private final Deque<HsBufferContext> unConsumedBuffers = new 
LinkedList<>();
+
+    private final Lock consumerLock;
+
+    private final Lock resultPartitionLock;
+
+    private final HsConsumerId consumerId;
+
+    private final int subpartitionId;
+
+    private final HsMemoryDataManagerOperation memoryDataManagerOperation;
+
+    public HsSubpartitionConsumerMemoryDataManager(
+            Lock resultPartitionLock,
+            Lock consumerLock,
+            int subpartitionId,
+            HsConsumerId consumerId,
+            HsMemoryDataManagerOperation memoryDataManagerOperation) {
+        this.resultPartitionLock = resultPartitionLock;
+        this.consumerLock = consumerLock;
+        this.subpartitionId = subpartitionId;
+        this.consumerId = consumerId;
+        this.memoryDataManagerOperation = memoryDataManagerOperation;
+    }
+
+    @GuardedBy("consumerLock")
+    // this method only called from subpartitionMemoryDataManager with write 
lock.
+    public void addInitialBuffers(Deque<HsBufferContext> buffers) {
+        unConsumedBuffers.addAll(buffers);
+    }
+
+    @GuardedBy("consumerLock")
+    // this method only called from subpartitionMemoryDataManager with write 
lock.
+    public boolean addBuffer(HsBufferContext bufferContext) {
+        unConsumedBuffers.add(bufferContext);
+        trimHeadingReleasedBuffers();
+        return unConsumedBuffers.size() <= 1;
+    }
+
+    /**
+     * Check whether the head of {@link #unConsumedBuffers} is the buffer to 
be consumed. If so,
+     * return the buffer and backlog.
+     *
+     * @param toConsumeIndex index of buffer to be consumed.
+     * @return If the head of {@link #unConsumedBuffers} is target, return 
optional of the buffer
+     *     and backlog. Otherwise, return {@link Optional#empty()}.
+     */
+    @SuppressWarnings("FieldAccessNotGuarded")
+    // Note that: callWithLock ensure that code block guarded by 
resultPartitionReadLock and
+    // subpartitionLock.
+    @Override
+    public Optional<ResultSubpartition.BufferAndBacklog> consumeBuffer(int 
toConsumeIndex) {
+        Optional<Tuple2<HsBufferContext, Buffer.DataType>> 
bufferAndNextDataType =
+                callWithLock(
+                        () -> {
+                            if 
(!checkFirstUnConsumedBufferIndex(toConsumeIndex)) {
+                                return Optional.empty();
+                            }
+
+                            HsBufferContext bufferContext =
+                                    
checkNotNull(unConsumedBuffers.pollFirst());
+                            bufferContext.consumed(consumerId);
+                            Buffer.DataType nextDataType =
+                                    
peekNextToConsumeDataTypeInternal(toConsumeIndex + 1);
+                            return Optional.of(Tuple2.of(bufferContext, 
nextDataType));
+                        });
+
+        bufferAndNextDataType.ifPresent(
+                tuple ->
+                        memoryDataManagerOperation.onBufferConsumed(
+                                tuple.f0.getBufferIndexAndChannel()));
+        return bufferAndNextDataType.map(
+                tuple ->
+                        new ResultSubpartition.BufferAndBacklog(
+                                tuple.f0.getBuffer().readOnlySlice(),
+                                getBacklog(),
+                                tuple.f1,
+                                toConsumeIndex));
+    }
+
+    /**
+     * Check whether the head of {@link #unConsumedBuffers} is the buffer to 
be consumed next time.
+     * If so, return the next buffer's data type.
+     *
+     * @param nextToConsumeIndex index of the buffer to be consumed next time.
+     * @return If the head of {@link #unConsumedBuffers} is target, return the 
buffer's data type.
+     *     Otherwise, return {@link Buffer.DataType#NONE}.
+     */
+    @SuppressWarnings("FieldAccessNotGuarded")
+    // Note that: callWithLock ensure that code block guarded by 
resultPartitionReadLock and
+    // consumerLock.
+    @Override
+    public Buffer.DataType peekNextToConsumeDataType(int nextToConsumeIndex) {
+        return callWithLock(() -> 
peekNextToConsumeDataTypeInternal(nextToConsumeIndex));
+    }
+
+    @GuardedBy("consumerLock")
+    private Buffer.DataType peekNextToConsumeDataTypeInternal(int 
nextToConsumeIndex) {
+        return checkFirstUnConsumedBufferIndex(nextToConsumeIndex)
+                ? 
checkNotNull(unConsumedBuffers.peekFirst()).getBuffer().getDataType()
+                : Buffer.DataType.NONE;
+    }
+
+    @GuardedBy("consumerLock")
+    private boolean checkFirstUnConsumedBufferIndex(int expectedBufferIndex) {
+        trimHeadingReleasedBuffers();
+        return !unConsumedBuffers.isEmpty()
+                && 
unConsumedBuffers.peekFirst().getBufferIndexAndChannel().getBufferIndex()
+                        == expectedBufferIndex;
+    }
+
+    @SuppressWarnings("FieldAccessNotGuarded")
+    // Un-synchronized get unConsumedBuffers size to provide memory data 
backlog,this will make the
+    // result greater than or equal to the actual backlog, but obtaining an 
accurate backlog will
+    // bring too much extra overhead.
+    @Override
+    public int getBacklog() {
+        return unConsumedBuffers.size();
+    }
+
+    @Override
+    public void releaseDataView() {
+        memoryDataManagerOperation.onConsumerReleased(subpartitionId, 
consumerId);
+    }
+
+    @GuardedBy("consumerLock")
+    private void trimHeadingReleasedBuffers() {
+        while (!unConsumedBuffers.isEmpty() && 
unConsumedBuffers.peekFirst().isReleased()) {
+            unConsumedBuffers.removeFirst();
+        }
+    }
+
+    private <R, E extends Exception> R callWithLock(SupplierWithException<R, 
E> callable) throws E {
+        try {
+            resultPartitionLock.lock();
+            consumerLock.lock();
+            return callable.get();
+        } finally {
+            consumerLock.unlock();
+            resultPartitionLock.unlock();
+        }
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReader.java
index 2ec2a632281..c87bb237660 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReader.java
@@ -58,7 +58,7 @@ public interface HsSubpartitionFileReader extends 
Comparable<HsSubpartitionFileR
         HsSubpartitionFileReader createFileReader(
                 int subpartitionId,
                 FileChannel dataFileChannel,
-                HsSubpartitionViewInternalOperations operation,
+                HsSubpartitionConsumerInternalOperations operation,
                 HsFileDataIndex dataIndex,
                 int maxBuffersReadAhead,
                 Consumer<HsSubpartitionFileReader> fileReaderReleaser,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java
index e6dc7122c5e..de7bd599e14 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java
@@ -57,7 +57,7 @@ public class HsSubpartitionFileReaderImpl implements 
HsSubpartitionFileReader {
 
     private final FileChannel dataFileChannel;
 
-    private final HsSubpartitionViewInternalOperations operations;
+    private final HsSubpartitionConsumerInternalOperations operations;
 
     private final CachedRegionManager cachedRegionManager;
 
@@ -72,7 +72,7 @@ public class HsSubpartitionFileReaderImpl implements 
HsSubpartitionFileReader {
     public HsSubpartitionFileReaderImpl(
             int subpartitionId,
             FileChannel dataFileChannel,
-            HsSubpartitionViewInternalOperations operations,
+            HsSubpartitionConsumerInternalOperations operations,
             HsFileDataIndex dataIndex,
             int maxBufferReadAhead,
             Consumer<HsSubpartitionFileReader> fileReaderReleaser,
@@ -487,7 +487,7 @@ public class HsSubpartitionFileReaderImpl implements 
HsSubpartitionFileReader {
         public HsSubpartitionFileReader createFileReader(
                 int subpartitionId,
                 FileChannel dataFileChannel,
-                HsSubpartitionViewInternalOperations operation,
+                HsSubpartitionConsumerInternalOperations operation,
                 HsFileDataIndex dataIndex,
                 int maxBuffersReadAhead,
                 Consumer<HsSubpartitionFileReader> fileReaderReleaser,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManager.java
index f6804a62b1b..d01cb0d2c78 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManager.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.io.network.partition.hybrid;
 
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
@@ -28,7 +27,6 @@ import 
org.apache.flink.runtime.io.network.buffer.BufferCompressor;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
-import 
org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingInfoProvider.ConsumeStatus;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingInfoProvider.ConsumeStatusWithId;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingInfoProvider.SpillStatus;
@@ -40,6 +38,7 @@ import javax.annotation.concurrent.GuardedBy;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayDeque;
+import java.util.ArrayList;
 import java.util.Deque;
 import java.util.HashMap;
 import java.util.LinkedList;
@@ -49,16 +48,18 @@ import java.util.Optional;
 import java.util.Queue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * This class is responsible for managing the data in a single subpartition. 
One {@link
  * HsMemoryDataManager} will hold multiple {@link 
HsSubpartitionMemoryDataManager}.
  */
-public class HsSubpartitionMemoryDataManager implements HsDataView {
+public class HsSubpartitionMemoryDataManager {
     private final int targetChannel;
 
     private final int bufferSize;
@@ -74,9 +75,6 @@ public class HsSubpartitionMemoryDataManager implements 
HsDataView {
     @GuardedBy("subpartitionLock")
     private final Deque<HsBufferContext> allBuffers = new LinkedList<>();
 
-    @GuardedBy("subpartitionLock")
-    private final Deque<HsBufferContext> unConsumedBuffers = new 
LinkedList<>();
-
     @GuardedBy("subpartitionLock")
     private final Map<Integer, HsBufferContext> bufferIndexToContexts = new 
HashMap<>();
 
@@ -84,7 +82,10 @@ public class HsSubpartitionMemoryDataManager implements 
HsDataView {
     private final Lock resultPartitionLock;
 
     /** DO NOT USE DIRECTLY. Use {@link #runWithLock} or {@link #callWithLock} 
instead. */
-    private final Object subpartitionLock = new Object();
+    private final ReentrantReadWriteLock subpartitionLock = new 
ReentrantReadWriteLock();
+
+    @GuardedBy("subpartitionLock")
+    private final Map<HsConsumerId, HsSubpartitionConsumerMemoryDataManager> 
consumerMap;
 
     @Nullable private final BufferCompressor bufferCompressor;
 
@@ -101,82 +102,7 @@ public class HsSubpartitionMemoryDataManager implements 
HsDataView {
         this.resultPartitionLock = resultPartitionLock;
         this.memoryDataManagerOperation = memoryDataManagerOperation;
         this.bufferCompressor = bufferCompressor;
-    }
-
-    // ------------------------------------------------------------------------
-    //  Called by Consumer
-    // ------------------------------------------------------------------------
-
-    /**
-     * Check whether the head of {@link #unConsumedBuffers} is the buffer to 
be consumed next time.
-     * If so, return the next buffer's data type.
-     *
-     * @param nextToConsumeIndex index of the buffer to be consumed next time.
-     * @return If the head of {@link #unConsumedBuffers} is target, return the 
buffer's data type.
-     *     Otherwise, return {@link DataType#NONE}.
-     */
-    @SuppressWarnings("FieldAccessNotGuarded")
-    // Note that: callWithLock ensure that code block guarded by 
resultPartitionReadLock and
-    // subpartitionLock.
-    @Override
-    public DataType peekNextToConsumeDataType(int nextToConsumeIndex) {
-        return callWithLock(() -> 
peekNextToConsumeDataTypeInternal(nextToConsumeIndex));
-    }
-
-    /**
-     * Check whether the head of {@link #unConsumedBuffers} is the buffer to 
be consumed. If so,
-     * return the buffer and backlog.
-     *
-     * @param toConsumeIndex index of buffer to be consumed.
-     * @return If the head of {@link #unConsumedBuffers} is target, return 
optional of the buffer
-     *     and backlog. Otherwise, return {@link Optional#empty()}.
-     */
-    @SuppressWarnings("FieldAccessNotGuarded")
-    // Note that: callWithLock ensure that code block guarded by 
resultPartitionReadLock and
-    // subpartitionLock.
-    @Override
-    public Optional<BufferAndBacklog> consumeBuffer(int toConsumeIndex) {
-        Optional<Tuple2<HsBufferContext, DataType>> bufferAndNextDataType =
-                callWithLock(
-                        () -> {
-                            if 
(!checkFirstUnConsumedBufferIndex(toConsumeIndex)) {
-                                return Optional.empty();
-                            }
-
-                            HsBufferContext bufferContext =
-                                    
checkNotNull(unConsumedBuffers.pollFirst());
-                            // TODO move this logical to consumer and pass 
real consumerId.
-                            bufferContext.consumed(HsConsumerId.DEFAULT);
-                            DataType nextDataType =
-                                    
peekNextToConsumeDataTypeInternal(toConsumeIndex + 1);
-                            return Optional.of(Tuple2.of(bufferContext, 
nextDataType));
-                        });
-
-        bufferAndNextDataType.ifPresent(
-                tuple ->
-                        memoryDataManagerOperation.onBufferConsumed(
-                                tuple.f0.getBufferIndexAndChannel()));
-        return bufferAndNextDataType.map(
-                tuple ->
-                        new BufferAndBacklog(
-                                tuple.f0.getBuffer().readOnlySlice(),
-                                getBacklog(),
-                                tuple.f1,
-                                toConsumeIndex));
-    }
-
-    @SuppressWarnings("FieldAccessNotGuarded")
-    // Un-synchronized get unConsumedBuffers size to provide memory data 
backlog,this will make the
-    // result greater than or equal to the actual backlog, but obtaining an 
accurate backlog will
-    // bring too much extra overhead.
-    @Override
-    public int getBacklog() {
-        return unConsumedBuffers.size();
-    }
-
-    @Override
-    public void releaseDataView() {
-        // nothing to do for memory data.
+        this.consumerMap = new HashMap<>();
     }
 
     // ------------------------------------------------------------------------
@@ -285,6 +211,29 @@ public class HsSubpartitionMemoryDataManager implements 
HsDataView {
         this.outputMetrics = checkNotNull(outputMetrics);
     }
 
+    @SuppressWarnings("FieldAccessNotGuarded")
+    public HsSubpartitionConsumerMemoryDataManager 
registerNewConsumer(HsConsumerId consumerId) {
+        return callWithLock(
+                () -> {
+                    checkState(!consumerMap.containsKey(consumerId));
+                    HsSubpartitionConsumerMemoryDataManager newConsumer =
+                            new HsSubpartitionConsumerMemoryDataManager(
+                                    resultPartitionLock,
+                                    subpartitionLock.readLock(),
+                                    targetChannel,
+                                    consumerId,
+                                    memoryDataManagerOperation);
+                    newConsumer.addInitialBuffers(allBuffers);
+                    consumerMap.put(consumerId, newConsumer);
+                    return newConsumer;
+                });
+    }
+
+    @SuppressWarnings("FieldAccessNotGuarded")
+    public void releaseConsumer(HsConsumerId consumerId) {
+        runWithLock(() -> checkNotNull(consumerMap.remove(consumerId)));
+    }
+
     // ------------------------------------------------------------------------
     //  Internal Methods
     // ------------------------------------------------------------------------
@@ -393,36 +342,22 @@ public class HsSubpartitionMemoryDataManager implements 
HsDataView {
     // subpartitionLock.
     private void addFinishedBuffer(HsBufferContext bufferContext) {
         finishedBufferIndex++;
-        boolean needNotify =
-                callWithLock(
-                        () -> {
-                            allBuffers.add(bufferContext);
-                            unConsumedBuffers.add(bufferContext);
-                            bufferIndexToContexts.put(
-                                    
bufferContext.getBufferIndexAndChannel().getBufferIndex(),
-                                    bufferContext);
-                            trimHeadingReleasedBuffers(unConsumedBuffers);
-                            updateStatistics(bufferContext.getBuffer());
-                            return unConsumedBuffers.size() <= 1;
-                        });
-        if (needNotify) {
-            memoryDataManagerOperation.onDataAvailable(targetChannel);
-        }
-    }
-
-    @GuardedBy("subpartitionLock")
-    private DataType peekNextToConsumeDataTypeInternal(int nextToConsumeIndex) 
{
-        return checkFirstUnConsumedBufferIndex(nextToConsumeIndex)
-                ? 
checkNotNull(unConsumedBuffers.peekFirst()).getBuffer().getDataType()
-                : DataType.NONE;
-    }
-
-    @GuardedBy("subpartitionLock")
-    private boolean checkFirstUnConsumedBufferIndex(int expectedBufferIndex) {
-        trimHeadingReleasedBuffers(unConsumedBuffers);
-        return !unConsumedBuffers.isEmpty()
-                && 
unConsumedBuffers.peekFirst().getBufferIndexAndChannel().getBufferIndex()
-                        == expectedBufferIndex;
+        List<HsConsumerId> needNotify = new ArrayList<>(consumerMap.size());
+        runWithLock(
+                () -> {
+                    allBuffers.add(bufferContext);
+                    bufferIndexToContexts.put(
+                            
bufferContext.getBufferIndexAndChannel().getBufferIndex(),
+                            bufferContext);
+                    for (Map.Entry<HsConsumerId, 
HsSubpartitionConsumerMemoryDataManager>
+                            consumerEntry : consumerMap.entrySet()) {
+                        if (consumerEntry.getValue().addBuffer(bufferContext)) 
{
+                            needNotify.add(consumerEntry.getKey());
+                        }
+                    }
+                    updateStatistics(bufferContext.getBuffer());
+                });
+        memoryDataManagerOperation.onDataAvailable(targetChannel, needNotify);
     }
 
     /**
@@ -517,10 +452,10 @@ public class HsSubpartitionMemoryDataManager implements 
HsDataView {
     private <E extends Exception> void runWithLock(ThrowingRunnable<E> 
runnable) throws E {
         try {
             resultPartitionLock.lock();
-            synchronized (subpartitionLock) {
-                runnable.run();
-            }
+            subpartitionLock.writeLock().lock();
+            runnable.run();
         } finally {
+            subpartitionLock.writeLock().unlock();
             resultPartitionLock.unlock();
         }
     }
@@ -528,10 +463,10 @@ public class HsSubpartitionMemoryDataManager implements 
HsDataView {
     private <R, E extends Exception> R callWithLock(SupplierWithException<R, 
E> callable) throws E {
         try {
             resultPartitionLock.lock();
-            synchronized (subpartitionLock) {
-                return callable.get();
-            }
+            subpartitionLock.writeLock().lock();
+            return callable.get();
         } finally {
+            subpartitionLock.writeLock().unlock();
             resultPartitionLock.unlock();
         }
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java
index 4c168c13fd6..0980688b7f2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java
@@ -78,7 +78,7 @@ class HsFileDataManagerTest {
 
     private HsFileDataManager fileDataManager;
 
-    private TestingSubpartitionViewInternalOperation subpartitionViewOperation;
+    private TestingSubpartitionConsumerInternalOperation 
subpartitionViewOperation;
 
     private TestingHsSubpartitionFileReader.Factory factory;
 
@@ -101,7 +101,7 @@ class HsFileDataManagerTest {
                         HybridShuffleConfiguration.builder(
                                         NUM_SUBPARTITIONS, 
bufferPool.getNumBuffersPerRequest())
                                 .build());
-        subpartitionViewOperation = new 
TestingSubpartitionViewInternalOperation();
+        subpartitionViewOperation = new 
TestingSubpartitionConsumerInternalOperation();
     }
 
     @AfterEach
@@ -352,8 +352,8 @@ class HsFileDataManagerTest {
     void testConsumeWhileReleaseNoDeadlock() throws Exception {
         CompletableFuture<Void> consumerStart = new CompletableFuture<>();
         CompletableFuture<Void> readerFail = new CompletableFuture<>();
-        HsSubpartitionView subpartitionView =
-                new HsSubpartitionView(new NoOpBufferAvailablityListener());
+        HsSubpartitionConsumer subpartitionView =
+                new HsSubpartitionConsumer(new 
NoOpBufferAvailablityListener());
 
         HsSubpartitionFileReaderImpl subpartitionFileReader =
                 new HsSubpartitionFileReaderImpl(
@@ -497,7 +497,7 @@ class HsFileDataManagerTest {
             public HsSubpartitionFileReader createFileReader(
                     int subpartitionId,
                     FileChannel dataFileChannel,
-                    HsSubpartitionViewInternalOperations operation,
+                    HsSubpartitionConsumerInternalOperations operation,
                     HsFileDataIndex dataIndex,
                     int maxBuffersReadAhead,
                     Consumer<HsSubpartitionFileReader> fileReaderReleaser,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManagerTest.java
index 773b847ecc1..e3c3663af06 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManagerTest.java
@@ -41,6 +41,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import static 
org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleTestUtils.createTestingOutputMetrics;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for {@link HsMemoryDataManager}. */
 class HsMemoryDataManagerTest {
@@ -192,6 +193,25 @@ class HsMemoryDataManagerTest {
         assertThat(resultPartitionReleaseFuture).isCompleted();
     }
 
+    @Test
+    void testSubpartitionConsumerRelease() throws Exception {
+        HsSpillingStrategy spillingStrategy = 
TestingSpillingStrategy.builder().build();
+        HsMemoryDataManager memoryDataManager = 
createMemoryDataManager(spillingStrategy);
+        memoryDataManager.registerNewConsumer(
+                0, HsConsumerId.DEFAULT, new 
TestingSubpartitionConsumerInternalOperation());
+        assertThatThrownBy(
+                        () ->
+                                memoryDataManager.registerNewConsumer(
+                                        0,
+                                        HsConsumerId.DEFAULT,
+                                        new 
TestingSubpartitionConsumerInternalOperation()))
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessageContaining("Each subpartition view should have 
unique consumerId.");
+        memoryDataManager.onConsumerReleased(0, HsConsumerId.DEFAULT);
+        memoryDataManager.registerNewConsumer(
+                0, HsConsumerId.DEFAULT, new 
TestingSubpartitionConsumerInternalOperation());
+    }
+
     @Test
     void testPoolSizeCheck() throws Exception {
         final int requiredBuffers = 10;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionConsumerMemoryDataManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionConsumerMemoryDataManagerTest.java
new file mode 100644
index 00000000000..9cb8d8b1751
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionConsumerMemoryDataManagerTest.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.ReadOnlySlicedNetworkBuffer;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayDeque;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleTestUtils.createBuffer;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link HsSubpartitionConsumerMemoryDataManager}. */
+@SuppressWarnings("FieldAccessNotGuarded")
+class HsSubpartitionConsumerMemoryDataManagerTest {
+
+    private static final int BUFFER_SIZE = Long.BYTES;
+
+    private static final int SUBPARTITION_ID = 0;
+
+    @Test
+    void testPeekNextToConsumeDataTypeNotMeetBufferIndexToConsume() throws 
Exception {
+        TestingMemoryDataManagerOperation memoryDataManagerOperation =
+                TestingMemoryDataManagerOperation.builder().build();
+        HsSubpartitionConsumerMemoryDataManager 
subpartitionConsumerMemoryDataManager =
+                
createSubpartitionConsumerMemoryDataManager(memoryDataManagerOperation);
+
+        subpartitionConsumerMemoryDataManager.addBuffer(createBufferContext(0, 
false));
+        
assertThat(subpartitionConsumerMemoryDataManager.peekNextToConsumeDataType(1))
+                .isEqualTo(Buffer.DataType.NONE);
+    }
+
+    @Test
+    void testPeekNextToConsumeDataTypeTrimHeadingReleasedBuffers() throws 
Exception {
+        TestingMemoryDataManagerOperation memoryDataManagerOperation =
+                TestingMemoryDataManagerOperation.builder().build();
+        HsSubpartitionConsumerMemoryDataManager 
subpartitionConsumerMemoryDataManager =
+                
createSubpartitionConsumerMemoryDataManager(memoryDataManagerOperation);
+
+        HsBufferContext buffer1 = createBufferContext(0, false);
+        HsBufferContext buffer2 = createBufferContext(1, false);
+        subpartitionConsumerMemoryDataManager.addBuffer(buffer1);
+        subpartitionConsumerMemoryDataManager.addBuffer(buffer2);
+        subpartitionConsumerMemoryDataManager.addBuffer(createBufferContext(2, 
true));
+
+        buffer1.release();
+        buffer2.release();
+
+        
assertThat(subpartitionConsumerMemoryDataManager.peekNextToConsumeDataType(2))
+                .isEqualTo(Buffer.DataType.EVENT_BUFFER);
+    }
+
+    @Test
+    void testConsumeBufferFirstUnConsumedBufferIndexNotMeetNextToConsume() 
throws Exception {
+        TestingMemoryDataManagerOperation memoryDataManagerOperation =
+                TestingMemoryDataManagerOperation.builder().build();
+        HsSubpartitionConsumerMemoryDataManager 
subpartitionConsumerMemoryDataManager =
+                
createSubpartitionConsumerMemoryDataManager(memoryDataManagerOperation);
+
+        subpartitionConsumerMemoryDataManager.addBuffer(createBufferContext(0, 
false));
+        
assertThat(subpartitionConsumerMemoryDataManager.consumeBuffer(1)).isNotPresent();
+    }
+
+    @Test
+    void testConsumeBufferTrimHeadingReleasedBuffers() throws Exception {
+        TestingMemoryDataManagerOperation memoryDataManagerOperation =
+                TestingMemoryDataManagerOperation.builder().build();
+        HsSubpartitionConsumerMemoryDataManager 
subpartitionConsumerMemoryDataManager =
+                
createSubpartitionConsumerMemoryDataManager(memoryDataManagerOperation);
+
+        HsBufferContext buffer1 = createBufferContext(0, false);
+        HsBufferContext buffer2 = createBufferContext(1, false);
+        subpartitionConsumerMemoryDataManager.addBuffer(buffer1);
+        subpartitionConsumerMemoryDataManager.addBuffer(buffer2);
+        subpartitionConsumerMemoryDataManager.addBuffer(createBufferContext(2, 
true));
+
+        buffer1.release();
+        buffer2.release();
+
+        
assertThat(subpartitionConsumerMemoryDataManager.consumeBuffer(2)).isPresent();
+    }
+
+    @Test
+    void testConsumeBufferReturnSlice() {
+        TestingMemoryDataManagerOperation memoryDataManagerOperation =
+                TestingMemoryDataManagerOperation.builder().build();
+        HsSubpartitionConsumerMemoryDataManager 
subpartitionConsumerMemoryDataManager =
+                
createSubpartitionConsumerMemoryDataManager(memoryDataManagerOperation);
+
+        subpartitionConsumerMemoryDataManager.addBuffer(createBufferContext(0, 
false));
+
+        Optional<ResultSubpartition.BufferAndBacklog> bufferOpt =
+                subpartitionConsumerMemoryDataManager.consumeBuffer(0);
+        assertThat(bufferOpt)
+                .hasValueSatisfying(
+                        (bufferAndBacklog ->
+                                assertThat(bufferAndBacklog.buffer())
+                                        
.isInstanceOf(ReadOnlySlicedNetworkBuffer.class)));
+    }
+
+    @Test
+    void testAddBuffer() {
+        TestingMemoryDataManagerOperation memoryDataManagerOperation =
+                TestingMemoryDataManagerOperation.builder().build();
+        HsSubpartitionConsumerMemoryDataManager 
subpartitionConsumerMemoryDataManager =
+                
createSubpartitionConsumerMemoryDataManager(memoryDataManagerOperation);
+        ArrayDeque<HsBufferContext> initialBuffers = new ArrayDeque<>();
+        initialBuffers.add(createBufferContext(0, false));
+        initialBuffers.add(createBufferContext(1, false));
+        
subpartitionConsumerMemoryDataManager.addInitialBuffers(initialBuffers);
+        subpartitionConsumerMemoryDataManager.addBuffer(createBufferContext(2, 
true));
+
+        assertThat(subpartitionConsumerMemoryDataManager.consumeBuffer(0))
+                .hasValueSatisfying(
+                        bufferAndBacklog -> {
+                            
assertThat(bufferAndBacklog.getSequenceNumber()).isEqualTo(0);
+                            assertThat(bufferAndBacklog.buffer().getDataType())
+                                    .isEqualTo(Buffer.DataType.DATA_BUFFER);
+                        });
+        assertThat(subpartitionConsumerMemoryDataManager.consumeBuffer(1))
+                .hasValueSatisfying(
+                        bufferAndBacklog -> {
+                            
assertThat(bufferAndBacklog.getSequenceNumber()).isEqualTo(1);
+                            assertThat(bufferAndBacklog.buffer().getDataType())
+                                    .isEqualTo(Buffer.DataType.DATA_BUFFER);
+                        });
+        assertThat(subpartitionConsumerMemoryDataManager.consumeBuffer(2))
+                .hasValueSatisfying(
+                        bufferAndBacklog -> {
+                            
assertThat(bufferAndBacklog.getSequenceNumber()).isEqualTo(2);
+                            assertThat(bufferAndBacklog.buffer().getDataType())
+                                    .isEqualTo(Buffer.DataType.EVENT_BUFFER);
+                        });
+    }
+
+    @Test
+    void testRelease() {
+        CompletableFuture<HsConsumerId> consumerReleasedFuture = new 
CompletableFuture<>();
+        TestingMemoryDataManagerOperation memoryDataManagerOperation =
+                TestingMemoryDataManagerOperation.builder()
+                        .setOnConsumerReleasedBiConsumer(
+                                (subpartitionId, consumerId) -> {
+                                    
consumerReleasedFuture.complete(consumerId);
+                                })
+                        .build();
+        HsConsumerId consumerId = HsConsumerId.newId(null);
+        HsSubpartitionConsumerMemoryDataManager 
subpartitionConsumerMemoryDataManager =
+                createSubpartitionConsumerMemoryDataManager(consumerId, 
memoryDataManagerOperation);
+        subpartitionConsumerMemoryDataManager.releaseDataView();
+        assertThat(consumerReleasedFuture).isCompletedWithValue(consumerId);
+    }
+
+    private static HsBufferContext createBufferContext(int bufferIndex, 
boolean isEvent) {
+        return new HsBufferContext(
+                createBuffer(BUFFER_SIZE, isEvent), bufferIndex, 
SUBPARTITION_ID);
+    }
+
+    private HsSubpartitionConsumerMemoryDataManager 
createSubpartitionConsumerMemoryDataManager(
+            HsMemoryDataManagerOperation memoryDataManagerOperation) {
+        return createSubpartitionConsumerMemoryDataManager(
+                HsConsumerId.DEFAULT, memoryDataManagerOperation);
+    }
+
+    private HsSubpartitionConsumerMemoryDataManager 
createSubpartitionConsumerMemoryDataManager(
+            HsConsumerId consumerId, HsMemoryDataManagerOperation 
memoryDataManagerOperation) {
+        return new HsSubpartitionConsumerMemoryDataManager(
+                new ReentrantLock(),
+                new ReentrantLock(),
+                // consumerMemoryDataManager is a member of 
subpartitionMemoryDataManager, using a
+                // fixed subpartition id is enough.
+                SUBPARTITION_ID,
+                consumerId,
+                memoryDataManagerOperation);
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java
index 03a560589d6..e60eff5dc09 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java
@@ -75,7 +75,7 @@ class HsSubpartitionFileReaderImplTest {
 
     private HsFileDataIndex diskIndex;
 
-    private TestingSubpartitionViewInternalOperation subpartitionOperation;
+    private TestingSubpartitionConsumerInternalOperation subpartitionOperation;
 
     private FileChannel dataFileChannel;
 
@@ -87,7 +87,7 @@ class HsSubpartitionFileReaderImplTest {
         Path dataFilePath = 
Files.createFile(tempPath.resolve(UUID.randomUUID().toString()));
         dataFileChannel = openFileChannel(dataFilePath);
         diskIndex = new HsFileDataIndexImpl(1);
-        subpartitionOperation = new TestingSubpartitionViewInternalOperation();
+        subpartitionOperation = new 
TestingSubpartitionConsumerInternalOperation();
         currentFileOffset = 0L;
     }
 
@@ -99,10 +99,10 @@ class HsSubpartitionFileReaderImplTest {
     @Test
     void testReadBuffer() throws Exception {
         diskIndex = new HsFileDataIndexImpl(2);
-        TestingSubpartitionViewInternalOperation viewNotifier1 =
-                new TestingSubpartitionViewInternalOperation();
-        TestingSubpartitionViewInternalOperation viewNotifier2 =
-                new TestingSubpartitionViewInternalOperation();
+        TestingSubpartitionConsumerInternalOperation viewNotifier1 =
+                new TestingSubpartitionConsumerInternalOperation();
+        TestingSubpartitionConsumerInternalOperation viewNotifier2 =
+                new TestingSubpartitionConsumerInternalOperation();
         HsSubpartitionFileReaderImpl fileReader1 = 
createSubpartitionFileReader(0, viewNotifier1);
         HsSubpartitionFileReaderImpl fileReader2 = 
createSubpartitionFileReader(1, viewNotifier2);
 
@@ -142,8 +142,8 @@ class HsSubpartitionFileReaderImplTest {
                 new BufferDecompressor(bufferSize, compressionFactoryName);
 
         diskIndex = new HsFileDataIndexImpl(1);
-        TestingSubpartitionViewInternalOperation viewNotifier =
-                new TestingSubpartitionViewInternalOperation();
+        TestingSubpartitionConsumerInternalOperation viewNotifier =
+                new TestingSubpartitionConsumerInternalOperation();
         HsSubpartitionFileReaderImpl fileReader1 = 
createSubpartitionFileReader(0, viewNotifier);
 
         writeDataToFile(0, 0, 1, 3, bufferCompressor);
@@ -362,10 +362,10 @@ class HsSubpartitionFileReaderImplTest {
     @Test
     void testCompareTo() throws Exception {
         diskIndex = new HsFileDataIndexImpl(2);
-        TestingSubpartitionViewInternalOperation viewNotifier1 =
-                new TestingSubpartitionViewInternalOperation();
-        TestingSubpartitionViewInternalOperation viewNotifier2 =
-                new TestingSubpartitionViewInternalOperation();
+        TestingSubpartitionConsumerInternalOperation viewNotifier1 =
+                new TestingSubpartitionConsumerInternalOperation();
+        TestingSubpartitionConsumerInternalOperation viewNotifier2 =
+                new TestingSubpartitionConsumerInternalOperation();
         HsSubpartitionFileReaderImpl fileReader1 = 
createSubpartitionFileReader(0, viewNotifier1);
         HsSubpartitionFileReaderImpl fileReader2 = 
createSubpartitionFileReader(1, viewNotifier2);
         assertThat(fileReader1).isEqualByComparingTo(fileReader2);
@@ -390,8 +390,8 @@ class HsSubpartitionFileReaderImplTest {
 
     @Test
     void testConsumeBuffer() throws Throwable {
-        TestingSubpartitionViewInternalOperation viewNotifier =
-                new TestingSubpartitionViewInternalOperation();
+        TestingSubpartitionConsumerInternalOperation viewNotifier =
+                new TestingSubpartitionConsumerInternalOperation();
         HsSubpartitionFileReaderImpl subpartitionFileReader =
                 createSubpartitionFileReader(0, viewNotifier);
 
@@ -428,8 +428,8 @@ class HsSubpartitionFileReaderImplTest {
 
     @Test
     void testPeekNextToConsumeDataTypeOrConsumeBufferThrowException() {
-        TestingSubpartitionViewInternalOperation viewNotifier =
-                new TestingSubpartitionViewInternalOperation();
+        TestingSubpartitionConsumerInternalOperation viewNotifier =
+                new TestingSubpartitionConsumerInternalOperation();
         HsSubpartitionFileReaderImpl subpartitionFileReader =
                 createSubpartitionFileReader(0, viewNotifier);
 
@@ -446,8 +446,8 @@ class HsSubpartitionFileReaderImplTest {
 
     @Test
     void testPeekNextToConsumeDataType() throws Throwable {
-        TestingSubpartitionViewInternalOperation viewNotifier =
-                new TestingSubpartitionViewInternalOperation();
+        TestingSubpartitionConsumerInternalOperation viewNotifier =
+                new TestingSubpartitionConsumerInternalOperation();
         HsSubpartitionFileReaderImpl subpartitionFileReader =
                 createSubpartitionFileReader(0, viewNotifier);
 
@@ -477,8 +477,8 @@ class HsSubpartitionFileReaderImplTest {
      */
     @Test
     void testSubpartitionReaderRegisterMultipleTimes() throws Exception {
-        TestingSubpartitionViewInternalOperation viewNotifier =
-                new TestingSubpartitionViewInternalOperation();
+        TestingSubpartitionConsumerInternalOperation viewNotifier =
+                new TestingSubpartitionConsumerInternalOperation();
         HsSubpartitionFileReaderImpl subpartitionFileReader =
                 createSubpartitionFileReader(0, viewNotifier);
         // mock the scenario that buffer 0 is already read form memory.
@@ -491,7 +491,7 @@ class HsSubpartitionFileReaderImplTest {
         checkData(subpartitionFileReader, 2, 3);
 
         // after failover, new view and subpartitionFileReader will be created.
-        viewNotifier = new TestingSubpartitionViewInternalOperation();
+        viewNotifier = new TestingSubpartitionConsumerInternalOperation();
         subpartitionFileReader = createSubpartitionFileReader(0, viewNotifier);
         subpartitionFileReader.prepareForScheduling();
         memorySegments = createsMemorySegments(3);
@@ -529,7 +529,7 @@ class HsSubpartitionFileReaderImplTest {
     }
 
     private HsSubpartitionFileReaderImpl createSubpartitionFileReader(
-            int targetChannel, HsSubpartitionViewInternalOperations 
operations) {
+            int targetChannel, HsSubpartitionConsumerInternalOperations 
operations) {
         return new HsSubpartitionFileReaderImpl(
                 targetChannel,
                 dataFileChannel,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManagerTest.java
index 174c021990d..998c87397f1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManagerTest.java
@@ -28,7 +28,6 @@ import 
org.apache.flink.runtime.io.network.buffer.Buffer.DataType;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
 import org.apache.flink.runtime.io.network.buffer.BufferDecompressor;
-import org.apache.flink.runtime.io.network.buffer.ReadOnlySlicedNetworkBuffer;
 import 
org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingInfoProvider.ConsumeStatus;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingInfoProvider.SpillStatus;
@@ -53,11 +52,14 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.HsConsumerId.DEFAULT;
 import static 
org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingInfoProvider.ConsumeStatusWithId.ALL_ANY;
 import static 
org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingInfoProvider.ConsumeStatusWithId.fromStatusAndConsumerId;
 import static 
org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleTestUtils.createBufferBuilder;
 import static 
org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleTestUtils.createTestingOutputMetrics;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatNoException;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for {@link HsSubpartitionMemoryDataManager}. */
 class HsSubpartitionMemoryDataManagerTest {
@@ -121,98 +123,9 @@ class HsSubpartitionMemoryDataManagerTest {
         assertThat(finishedBuffers).hasValue(2);
     }
 
-    @Test
-    void testPeekNextToConsumeDataTypeNotMeetBufferIndexToConsume() throws 
Exception {
-        TestingMemoryDataManagerOperation memoryDataManagerOperation =
-                TestingMemoryDataManagerOperation.builder()
-                        .setRequestBufferFromPoolSupplier(() -> 
createBufferBuilder(RECORD_SIZE))
-                        .build();
-        HsSubpartitionMemoryDataManager subpartitionMemoryDataManager =
-                
createSubpartitionMemoryDataManager(memoryDataManagerOperation);
-
-        subpartitionMemoryDataManager.append(createRecord(0), 
DataType.DATA_BUFFER);
-
-        assertThat(subpartitionMemoryDataManager.peekNextToConsumeDataType(1))
-                .isEqualTo(DataType.NONE);
-    }
-
-    @Test
-    void testPeekNextToConsumeDataTypeTrimHeadingReleasedBuffers() throws 
Exception {
-        TestingMemoryDataManagerOperation memoryDataManagerOperation =
-                TestingMemoryDataManagerOperation.builder()
-                        .setRequestBufferFromPoolSupplier(() -> 
createBufferBuilder(RECORD_SIZE))
-                        .build();
-        HsSubpartitionMemoryDataManager subpartitionMemoryDataManager =
-                
createSubpartitionMemoryDataManager(memoryDataManagerOperation);
-
-        subpartitionMemoryDataManager.append(createRecord(0), 
DataType.DATA_BUFFER);
-        subpartitionMemoryDataManager.append(createRecord(1), 
DataType.DATA_BUFFER);
-        subpartitionMemoryDataManager.append(createRecord(2), 
DataType.EVENT_BUFFER);
-
-        List<BufferIndexAndChannel> toRelease =
-                HybridShuffleTestUtils.createBufferIndexAndChannelsList(0, 0, 
1);
-        subpartitionMemoryDataManager.releaseSubpartitionBuffers(toRelease);
-
-        assertThat(subpartitionMemoryDataManager.peekNextToConsumeDataType(2))
-                .isEqualTo(DataType.EVENT_BUFFER);
-    }
-
-    @Test
-    void testConsumeBufferFirstUnConsumedBufferIndexNotMeetNextToConsume() 
throws Exception {
-        TestingMemoryDataManagerOperation memoryDataManagerOperation =
-                TestingMemoryDataManagerOperation.builder()
-                        .setRequestBufferFromPoolSupplier(() -> 
createBufferBuilder(RECORD_SIZE))
-                        .build();
-        HsSubpartitionMemoryDataManager subpartitionMemoryDataManager =
-                
createSubpartitionMemoryDataManager(memoryDataManagerOperation);
-
-        subpartitionMemoryDataManager.append(createRecord(0), 
DataType.DATA_BUFFER);
-
-        
assertThat(subpartitionMemoryDataManager.consumeBuffer(1)).isNotPresent();
-    }
-
-    @Test
-    void testConsumeBufferTrimHeadingReleasedBuffers() throws Exception {
-        TestingMemoryDataManagerOperation memoryDataManagerOperation =
-                TestingMemoryDataManagerOperation.builder()
-                        .setRequestBufferFromPoolSupplier(() -> 
createBufferBuilder(RECORD_SIZE))
-                        .build();
-        HsSubpartitionMemoryDataManager subpartitionMemoryDataManager =
-                
createSubpartitionMemoryDataManager(memoryDataManagerOperation);
-
-        subpartitionMemoryDataManager.append(createRecord(0), 
DataType.DATA_BUFFER);
-        subpartitionMemoryDataManager.append(createRecord(1), 
DataType.DATA_BUFFER);
-        subpartitionMemoryDataManager.append(createRecord(2), 
DataType.EVENT_BUFFER);
-
-        List<BufferIndexAndChannel> toRelease =
-                HybridShuffleTestUtils.createBufferIndexAndChannelsList(0, 0, 
1);
-        subpartitionMemoryDataManager.releaseSubpartitionBuffers(toRelease);
-
-        assertThat(subpartitionMemoryDataManager.consumeBuffer(2)).isPresent();
-    }
-
-    @Test
-    void testConsumeBufferReturnSlice() throws Exception {
-        TestingMemoryDataManagerOperation memoryDataManagerOperation =
-                TestingMemoryDataManagerOperation.builder()
-                        .setRequestBufferFromPoolSupplier(() -> 
createBufferBuilder(RECORD_SIZE))
-                        .build();
-        HsSubpartitionMemoryDataManager subpartitionMemoryDataManager =
-                
createSubpartitionMemoryDataManager(memoryDataManagerOperation);
-
-        subpartitionMemoryDataManager.append(createRecord(0), 
DataType.DATA_BUFFER);
-
-        Optional<BufferAndBacklog> bufferOpt = 
subpartitionMemoryDataManager.consumeBuffer(0);
-        assertThat(bufferOpt)
-                .hasValueSatisfying(
-                        (bufferAndBacklog ->
-                                assertThat(bufferAndBacklog.buffer())
-                                        
.isInstanceOf(ReadOnlySlicedNetworkBuffer.class)));
-    }
-
     @ParameterizedTest
     @ValueSource(strings = {"LZ4", "LZO", "ZSTD", "NULL"})
-    void testConsumeBuffer(String compressionFactoryName) throws Exception {
+    void testCompressBufferAndConsume(String compressionFactoryName) throws 
Exception {
         final int numDataBuffers = 10;
         final int numRecordsPerBuffer = 10;
         // write numRecordsPerBuffer long record to one buffer, as a single 
long is
@@ -248,9 +161,11 @@ class HsSubpartitionMemoryDataManagerTest {
         subpartitionMemoryDataManager.append(createRecord(recordValue), 
DataType.EVENT_BUFFER);
         expectedRecords.add(Tuple2.of(recordValue, DataType.EVENT_BUFFER));
 
+        HsSubpartitionConsumerMemoryDataManager consumer =
+                subpartitionMemoryDataManager.registerNewConsumer(DEFAULT);
         ArrayList<Optional<BufferAndBacklog>> bufferAndBacklogOpts = new 
ArrayList<>();
         for (int i = 0; i < numDataBuffers + 1; i++) {
-            
bufferAndBacklogOpts.add(subpartitionMemoryDataManager.consumeBuffer(i));
+            bufferAndBacklogOpts.add(consumer.consumeBuffer(i));
         }
         checkConsumedBufferAndNextDataType(
                 numRecordsPerBuffer, bufferDecompressor, expectedRecords, 
bufferAndBacklogOpts);
@@ -276,6 +191,8 @@ class HsSubpartitionMemoryDataManagerTest {
                         .build();
         HsSubpartitionMemoryDataManager subpartitionMemoryDataManager =
                 
createSubpartitionMemoryDataManager(memoryDataManagerOperation);
+        HsSubpartitionConsumerMemoryDataManager consumer =
+                subpartitionMemoryDataManager.registerNewConsumer(DEFAULT);
         final int numBuffers = 4;
         for (int i = 0; i < numBuffers; i++) {
             subpartitionMemoryDataManager.append(createRecord(i), 
DataType.DATA_BUFFER);
@@ -288,8 +205,8 @@ class HsSubpartitionMemoryDataManagerTest {
         
subpartitionMemoryDataManager.spillSubpartitionBuffers(toStartSpilling, 
spilledDoneFuture);
 
         // consume buffer 0, 1
-        subpartitionMemoryDataManager.consumeBuffer(0);
-        subpartitionMemoryDataManager.consumeBuffer(1);
+        consumer.consumeBuffer(0);
+        consumer.consumeBuffer(1);
 
         checkBufferIndex(
                 
subpartitionMemoryDataManager.getBuffersSatisfyStatus(SpillStatus.ALL, ALL_ANY),
@@ -432,6 +349,33 @@ class HsSubpartitionMemoryDataManagerTest {
         assertThat(metrics.getNumBytesOut().getCount()).isEqualTo(recordSize + 
eventSize);
     }
 
+    @Test
+    void testConsumerRegisterRepeatedly() {
+        TestingMemoryDataManagerOperation memoryDataManagerOperation =
+                TestingMemoryDataManagerOperation.builder().build();
+        HsSubpartitionMemoryDataManager subpartitionMemoryDataManager =
+                
createSubpartitionMemoryDataManager(memoryDataManagerOperation);
+
+        HsConsumerId consumerId = HsConsumerId.newId(null);
+        subpartitionMemoryDataManager.registerNewConsumer(consumerId);
+        assertThatThrownBy(() -> 
subpartitionMemoryDataManager.registerNewConsumer(consumerId))
+                .isInstanceOf(IllegalStateException.class);
+    }
+
+    @Test
+    void testRegisterAndReleaseConsumer() {
+        TestingMemoryDataManagerOperation memoryDataManagerOperation =
+                TestingMemoryDataManagerOperation.builder().build();
+        HsSubpartitionMemoryDataManager subpartitionMemoryDataManager =
+                
createSubpartitionMemoryDataManager(memoryDataManagerOperation);
+
+        HsConsumerId consumerId = HsConsumerId.newId(null);
+        subpartitionMemoryDataManager.registerNewConsumer(consumerId);
+        subpartitionMemoryDataManager.releaseConsumer(consumerId);
+        assertThatNoException()
+                .isThrownBy(() -> 
subpartitionMemoryDataManager.registerNewConsumer(consumerId));
+    }
+
     private static void checkBufferIndex(
             Deque<BufferIndexAndChannel> bufferWithIdentities, List<Integer> 
expectedIndexes) {
         List<Integer> bufferIndexes =
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.java
index 451ab8980da..11ea5c3766b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.java
@@ -42,11 +42,11 @@ import static 
org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffle
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
-/** Tests for {@link HsSubpartitionView}. */
+/** Tests for {@link HsSubpartitionConsumer}. */
 class HsSubpartitionViewTest {
     @Test
     void testGetNextBufferFromDisk() {
-        HsSubpartitionView subpartitionView = createSubpartitionView();
+        HsSubpartitionConsumer subpartitionView = createSubpartitionView();
 
         BufferAndBacklog bufferAndBacklog = createBufferAndBacklog(1, 
DataType.DATA_BUFFER, 0);
         CompletableFuture<Void> consumeBufferFromMemoryFuture = new 
CompletableFuture<>();
@@ -77,7 +77,7 @@ class HsSubpartitionViewTest {
         final int bufferSize = 16;
         NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 
bufferSize);
         BufferPool bufferPool = networkBufferPool.createBufferPool(10, 10);
-        HsSubpartitionView subpartitionView = createSubpartitionView();
+        HsSubpartitionConsumer subpartitionView = createSubpartitionView();
 
         CompletableFuture<Void> acquireWriteLock = new CompletableFuture<>();
 
@@ -102,7 +102,8 @@ class HsSubpartitionViewTest {
                                     } catch (Exception e) {
                                         throw new RuntimeException(e);
                                     }
-                                    
spillingInfoProvider.getNextBufferIndexToConsume();
+                                    
spillingInfoProvider.getNextBufferIndexToConsume(
+                                            HsConsumerId.DEFAULT);
                                     return 
HsSpillingStrategy.Decision.NO_ACTION;
                                 })
                         .build();
@@ -117,7 +118,8 @@ class HsSubpartitionViewTest {
                         null,
                         0);
         memoryDataManager.setOutputMetrics(createTestingOutputMetrics());
-        HsDataView hsDataView = memoryDataManager.registerSubpartitionView(0, 
subpartitionView);
+        HsDataView hsDataView =
+                memoryDataManager.registerNewConsumer(0, HsConsumerId.DEFAULT, 
subpartitionView);
         subpartitionView.setMemoryDataView(hsDataView);
         subpartitionView.setDiskDataView(TestingHsDataView.NO_OP);
 
@@ -128,7 +130,7 @@ class HsSubpartitionViewTest {
 
     @Test
     void testGetNextBufferFromDiskNextDataTypeIsNone() {
-        HsSubpartitionView subpartitionView = createSubpartitionView();
+        HsSubpartitionConsumer subpartitionView = createSubpartitionView();
         BufferAndBacklog bufferAndBacklog = createBufferAndBacklog(0, 
DataType.NONE, 0);
 
         TestingHsDataView diskDataView =
@@ -158,7 +160,7 @@ class HsSubpartitionViewTest {
 
     @Test
     void testGetNextBufferFromMemory() {
-        HsSubpartitionView subpartitionView = createSubpartitionView();
+        HsSubpartitionConsumer subpartitionView = createSubpartitionView();
 
         BufferAndBacklog bufferAndBacklog = createBufferAndBacklog(1, 
DataType.DATA_BUFFER, 0);
         TestingHsDataView memoryDataView =
@@ -179,7 +181,7 @@ class HsSubpartitionViewTest {
 
     @Test
     void testGetNextBufferThrowException() {
-        HsSubpartitionView subpartitionView = createSubpartitionView();
+        HsSubpartitionConsumer subpartitionView = createSubpartitionView();
 
         TestingHsDataView diskDataView =
                 TestingHsDataView.builder()
@@ -200,7 +202,7 @@ class HsSubpartitionViewTest {
 
     @Test
     void testGetNextBufferZeroBacklog() {
-        HsSubpartitionView subpartitionView = createSubpartitionView();
+        HsSubpartitionConsumer subpartitionView = createSubpartitionView();
 
         final int diskBacklog = 0;
         final int memoryBacklog = 10;
@@ -236,7 +238,7 @@ class HsSubpartitionViewTest {
     @Test
     void testNotifyDataAvailableNeedNotify() {
         CompletableFuture<Void> notifyAvailableFuture = new 
CompletableFuture<>();
-        HsSubpartitionView subpartitionView =
+        HsSubpartitionConsumer subpartitionView =
                 createSubpartitionView(() -> 
notifyAvailableFuture.complete(null));
 
         TestingHsDataView memoryDataView =
@@ -256,7 +258,7 @@ class HsSubpartitionViewTest {
     @Test
     void testNotifyDataAvailableNotNeedNotify() {
         CompletableFuture<Void> notifyAvailableFuture = new 
CompletableFuture<>();
-        HsSubpartitionView subpartitionView =
+        HsSubpartitionConsumer subpartitionView =
                 createSubpartitionView(() -> 
notifyAvailableFuture.complete(null));
 
         TestingHsDataView memoryDataView =
@@ -277,7 +279,7 @@ class HsSubpartitionViewTest {
     @Test
     void testGetZeroBacklogNeedNotify() {
         CompletableFuture<Void> notifyAvailableFuture = new 
CompletableFuture<>();
-        HsSubpartitionView subpartitionView =
+        HsSubpartitionConsumer subpartitionView =
                 createSubpartitionView(() -> 
notifyAvailableFuture.complete(null));
         subpartitionView.setMemoryDataView(TestingHsDataView.NO_OP);
         subpartitionView.setDiskDataView(
@@ -294,7 +296,7 @@ class HsSubpartitionViewTest {
 
     @Test
     void testGetAvailabilityAndBacklogPositiveCredit() {
-        HsSubpartitionView subpartitionView = createSubpartitionView();
+        HsSubpartitionConsumer subpartitionView = createSubpartitionView();
         subpartitionView.setMemoryDataView(TestingHsDataView.NO_OP);
 
         final int backlog = 2;
@@ -311,7 +313,7 @@ class HsSubpartitionViewTest {
     void testGetAvailabilityAndBacklogNonPositiveCreditNextIsData() {
         final int backlog = 2;
 
-        HsSubpartitionView subpartitionView = createSubpartitionView();
+        HsSubpartitionConsumer subpartitionView = createSubpartitionView();
         subpartitionView.setMemoryDataView(
                 TestingHsDataView.builder()
                         .setConsumeBufferFunction(
@@ -336,7 +338,7 @@ class HsSubpartitionViewTest {
     void testGetAvailabilityAndBacklogNonPositiveCreditNextIsEvent() {
         final int backlog = 2;
 
-        HsSubpartitionView subpartitionView = createSubpartitionView();
+        HsSubpartitionConsumer subpartitionView = createSubpartitionView();
         subpartitionView.setMemoryDataView(
                 TestingHsDataView.builder()
                         .setConsumeBufferFunction(
@@ -359,23 +361,29 @@ class HsSubpartitionViewTest {
 
     @Test
     void testRelease() throws Exception {
-        HsSubpartitionView subpartitionView = createSubpartitionView();
-        CompletableFuture<Void> releaseDataViewFuture = new 
CompletableFuture<>();
+        HsSubpartitionConsumer subpartitionView = createSubpartitionView();
+        CompletableFuture<Void> releaseDiskViewFuture = new 
CompletableFuture<>();
+        CompletableFuture<Void> releaseMemoryViewFuture = new 
CompletableFuture<>();
         TestingHsDataView diskDataView =
                 TestingHsDataView.builder()
-                        .setReleaseDataViewRunnable(() -> 
releaseDataViewFuture.complete(null))
+                        .setReleaseDataViewRunnable(() -> 
releaseDiskViewFuture.complete(null))
+                        .build();
+        TestingHsDataView memoryDataView =
+                TestingHsDataView.builder()
+                        .setReleaseDataViewRunnable(() -> 
releaseMemoryViewFuture.complete(null))
                         .build();
         subpartitionView.setDiskDataView(diskDataView);
-        subpartitionView.setMemoryDataView(TestingHsDataView.NO_OP);
+        subpartitionView.setMemoryDataView(memoryDataView);
         subpartitionView.releaseAllResources();
         assertThat(subpartitionView.isReleased()).isTrue();
-        assertThat(releaseDataViewFuture).isCompleted();
+        assertThat(releaseDiskViewFuture).isCompleted();
+        assertThat(releaseMemoryViewFuture).isCompleted();
     }
 
     @Test
     void testGetConsumingOffset() {
         AtomicInteger nextBufferIndex = new AtomicInteger(0);
-        HsSubpartitionView subpartitionView = createSubpartitionView();
+        HsSubpartitionConsumer subpartitionView = createSubpartitionView();
         TestingHsDataView diskDataView =
                 TestingHsDataView.builder()
                         .setConsumeBufferFunction(
@@ -398,7 +406,7 @@ class HsSubpartitionViewTest {
 
     @Test
     void testSetDataViewRepeatedly() {
-        HsSubpartitionView subpartitionView = createSubpartitionView();
+        HsSubpartitionConsumer subpartitionView = createSubpartitionView();
 
         subpartitionView.setMemoryDataView(TestingHsDataView.NO_OP);
         assertThatThrownBy(() -> 
subpartitionView.setMemoryDataView(TestingHsDataView.NO_OP))
@@ -411,13 +419,13 @@ class HsSubpartitionViewTest {
                 .hasMessageContaining("repeatedly set disk data view is not 
allowed.");
     }
 
-    private static HsSubpartitionView createSubpartitionView() {
-        return new HsSubpartitionView(new NoOpBufferAvailablityListener());
+    private static HsSubpartitionConsumer createSubpartitionView() {
+        return new HsSubpartitionConsumer(new NoOpBufferAvailablityListener());
     }
 
-    private static HsSubpartitionView createSubpartitionView(
+    private static HsSubpartitionConsumer createSubpartitionView(
             BufferAvailabilityListener bufferAvailabilityListener) {
-        return new HsSubpartitionView(bufferAvailabilityListener);
+        return new HsSubpartitionConsumer(bufferAvailabilityListener);
     }
 
     private static BufferAndBacklog createBufferAndBacklog(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingMemoryDataManagerOperation.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingMemoryDataManagerOperation.java
index 2b16d5d4d4f..5ac37c2a1d9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingMemoryDataManagerOperation.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingMemoryDataManagerOperation.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.network.partition.hybrid;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.util.function.SupplierWithException;
 
+import java.util.Collection;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 
@@ -37,18 +38,22 @@ public class TestingMemoryDataManagerOperation implements 
HsMemoryDataManagerOpe
 
     private final Runnable onDataAvailableRunnable;
 
+    private final BiConsumer<Integer, HsConsumerId> 
onConsumerReleasedBiConsumer;
+
     private TestingMemoryDataManagerOperation(
             SupplierWithException<BufferBuilder, InterruptedException>
                     requestBufferFromPoolSupplier,
             BiConsumer<Integer, Integer> markBufferReadableConsumer,
             Consumer<BufferIndexAndChannel> onBufferConsumedConsumer,
             Runnable onBufferFinishedRunnable,
-            Runnable onDataAvailableRunnable) {
+            Runnable onDataAvailableRunnable,
+            BiConsumer<Integer, HsConsumerId> onConsumerReleasedBiConsumer) {
         this.requestBufferFromPoolSupplier = requestBufferFromPoolSupplier;
         this.markBufferReadableConsumer = markBufferReadableConsumer;
         this.onBufferConsumedConsumer = onBufferConsumedConsumer;
         this.onBufferFinishedRunnable = onBufferFinishedRunnable;
         this.onDataAvailableRunnable = onDataAvailableRunnable;
+        this.onConsumerReleasedBiConsumer = onConsumerReleasedBiConsumer;
     }
 
     @Override
@@ -72,10 +77,15 @@ public class TestingMemoryDataManagerOperation implements 
HsMemoryDataManagerOpe
     }
 
     @Override
-    public void onDataAvailable(int subpartitionId) {
+    public void onDataAvailable(int subpartitionId, Collection<HsConsumerId> 
consumerIds) {
         onDataAvailableRunnable.run();
     }
 
+    @Override
+    public void onConsumerReleased(int subpartitionId, HsConsumerId 
consumerId) {
+        onConsumerReleasedBiConsumer.accept(subpartitionId, consumerId);
+    }
+
     public static Builder builder() {
         return new Builder();
     }
@@ -93,6 +103,9 @@ public class TestingMemoryDataManagerOperation implements 
HsMemoryDataManagerOpe
 
         private Runnable onDataAvailableRunnable = () -> {};
 
+        private BiConsumer<Integer, HsConsumerId> onConsumerReleasedBiConsumer 
=
+                (ignore1, ignore2) -> {};
+
         public Builder setRequestBufferFromPoolSupplier(
                 SupplierWithException<BufferBuilder, InterruptedException>
                         requestBufferFromPoolSupplier) {
@@ -122,6 +135,12 @@ public class TestingMemoryDataManagerOperation implements 
HsMemoryDataManagerOpe
             return this;
         }
 
+        public Builder setOnConsumerReleasedBiConsumer(
+                BiConsumer<Integer, HsConsumerId> 
onConsumerReleasedBiConsumer) {
+            this.onConsumerReleasedBiConsumer = onConsumerReleasedBiConsumer;
+            return this;
+        }
+
         private Builder() {}
 
         public TestingMemoryDataManagerOperation build() {
@@ -130,7 +149,8 @@ public class TestingMemoryDataManagerOperation implements 
HsMemoryDataManagerOpe
                     markBufferReadableConsumer,
                     onBufferConsumedConsumer,
                     onBufferFinishedRunnable,
-                    onDataAvailableRunnable);
+                    onDataAvailableRunnable,
+                    onConsumerReleasedBiConsumer);
         }
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSpillingInfoProvider.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSpillingInfoProvider.java
index 67b34d0e774..9980f4176fa 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSpillingInfoProvider.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSpillingInfoProvider.java
@@ -73,7 +73,7 @@ public class TestingSpillingInfoProvider implements 
HsSpillingInfoProvider {
     }
 
     @Override
-    public List<Integer> getNextBufferIndexToConsume() {
+    public List<Integer> getNextBufferIndexToConsume(HsConsumerId consumerId) {
         return getNextBufferIndexToConsumeSupplier.get();
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSubpartitionViewInternalOperation.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSubpartitionConsumerInternalOperation.java
similarity index 88%
rename from 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSubpartitionViewInternalOperation.java
rename to 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSubpartitionConsumerInternalOperation.java
index ff2f8250b2a..1f32189da53 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSubpartitionViewInternalOperation.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSubpartitionConsumerInternalOperation.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.runtime.io.network.partition.hybrid;
 
-/** Mock {@link HsSubpartitionViewInternalOperations} for test. */
-public class TestingSubpartitionViewInternalOperation
-        implements HsSubpartitionViewInternalOperations {
+/** Mock {@link HsSubpartitionConsumerInternalOperations} for test. */
+public class TestingSubpartitionConsumerInternalOperation
+        implements HsSubpartitionConsumerInternalOperations {
     // -1 indicates downstream just start consuming offset.
     private int consumingOffset = -1;
 


Reply via email to