xintongsong commented on code in PR #21122:
URL: https://github.com/apache/flink/pull/21122#discussion_r1001497856
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java:
##########
@@ -72,8 +73,8 @@ public class HsMemoryDataManager implements
HsSpillingInfoProvider, HsMemoryData
private final AtomicInteger numUnSpillBuffers = new AtomicInteger(0);
- private final Map<Integer, HsSubpartitionViewInternalOperations>
subpartitionViewOperationsMap =
- new ConcurrentHashMap<>();
+ private final List<Map<Integer, HsSubpartitionViewInternalOperations>>
+ subpartitionViewOperationsMap;
Review Comment:
Definition should be documented.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingInfoProvider.java:
##########
@@ -79,4 +79,24 @@ enum ConsumeStatus {
/** The buffer is either consumed or not consumed. */
ALL
}
+
+ /** This class represents a pair of {@link ConsumeStatus} and consumer id.
*/
+ class ConsumeStatusWithId {
+ public static final ConsumeStatusWithId ALL_CONSUME_STATUS =
+ new ConsumeStatusWithId(ConsumeStatus.ALL, -1);
+
+ ConsumeStatus status;
+
+ int consumerId;
Review Comment:
I think we need a dedicated class for the consumer id, where we can define
the special values such as `ANY` and `SINGLE_CONSUMER`.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java:
##########
@@ -72,8 +73,8 @@ public class HsMemoryDataManager implements
HsSpillingInfoProvider, HsMemoryData
private final AtomicInteger numUnSpillBuffers = new AtomicInteger(0);
- private final Map<Integer, HsSubpartitionViewInternalOperations>
subpartitionViewOperationsMap =
- new ConcurrentHashMap<>();
+ private final List<Map<Integer, HsSubpartitionViewInternalOperations>>
+ subpartitionViewOperationsMap;
Review Comment:
Can we replace the outer `List` with an array here, to align with
`subpartitionMemoryDataManagers`?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java:
##########
@@ -72,8 +73,8 @@ public class HsMemoryDataManager implements
HsSpillingInfoProvider, HsMemoryData
private final AtomicInteger numUnSpillBuffers = new AtomicInteger(0);
- private final Map<Integer, HsSubpartitionViewInternalOperations>
subpartitionViewOperationsMap =
- new ConcurrentHashMap<>();
+ private final List<Map<Integer, HsSubpartitionViewInternalOperations>>
+ subpartitionViewOperationsMap;
Review Comment:
Why the inner is a `Map` rather than a `List`. With a list, we won't need
the `consumerIdCounter` in `HsResultPartition`.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java:
##########
@@ -161,15 +164,15 @@ public void append(ByteBuffer record, int targetChannel,
Buffer.DataType dataTyp
* subpartition.
*/
public HsDataView registerSubpartitionView(
- int subpartitionId, HsSubpartitionViewInternalOperations
viewOperations) {
+ int subpartitionId,
+ int consumerId,
+ HsSubpartitionViewInternalOperations viewOperations) {
HsSubpartitionViewInternalOperations oldView =
- subpartitionViewOperationsMap.put(subpartitionId,
viewOperations);
- if (oldView != null) {
- LOG.debug(
- "subpartition : {} register subpartition view will replace
old view. ",
- subpartitionId);
- }
- return getSubpartitionMemoryDataManager(subpartitionId);
+
subpartitionViewOperationsMap.get(subpartitionId).put(consumerId,
viewOperations);
+ Preconditions.checkState(
+ oldView == null, "Each subpartition view should have unique
consumerId.");
+ return getSubpartitionMemoryDataManager(subpartitionId)
Review Comment:
Shall we also check that selective strategy should not have multiple
consumers?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]