xintongsong commented on code in PR #19885:
URL: https://github.com/apache/flink/pull/19885#discussion_r890886104


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java:
##########
@@ -104,13 +118,40 @@ public enum ResultPartitionType {
      */
     private final boolean isReconnectable;
 
+    private final ConsumeType consumeType;
+
+    private final ReleaseType releaseType;
+
+    /** ConsumeType indicates when can the downstream consume the upstream. */
+    private enum ConsumeType {

Review Comment:
   I'd suggest to name this `ConsumingConstraint`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java:
##########
@@ -177,6 +177,32 @@ public boolean isReleaseByScheduler() {
         return releaseType == ReleaseType.RELEASE_BY_SCHEDULER;
     }
 
+    /**
+     * {@link #isBlockingOrBlockingPersistentResultPartition()} is used to 
judge whether it is the
+     * specified {@link #BLOCKING} or {@link #BLOCKING_PERSISTENT} 
resultPartitionType.
+     *
+     * <p>this method suitable for judgment conditions related to the specific 
implementation of
+     * {@link ResultPartitionType}.
+     *
+     * <p>this method not related to data consumption and partition release. 
As for the logic
+     * related to partition release, use {@link #isReleaseByScheduler()} 
instead, and as consume
+     * type, use {@link #mustBePipelinedConsumed()} or {@link 
#canBePipelinedConsumed()} instead.
+     */
+    public boolean isBlockingOrBlockingPersistentResultPartition() {
+        return this == BLOCKING || this == BLOCKING_PERSISTENT;
+    }
+
+    /**
+     * {@link #isPipelinedOrPipelinedBoundedResultPartition()} is used to 
judge whether it is the
+     * specified {@link #PIPELINED} or {@link #PIPELINED_BOUNDED} 
resultPartitionType.
+     *
+     * <p>This method suitable for judgment conditions related to the specific 
implementation of
+     * {@link ResultPartitionType}.
+     *
+     * <p>This method not related to data consumption and partition release. 
As for the logic
+     * related to partition release, use {@link #isReleaseByScheduler()} 
instead, and as consume
+     * type, use {@link #mustBePipelinedConsumed()} or {@link 
#canBePipelinedConsumed()} instead.
+     */

Review Comment:
   Belong to wrong commit.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java:
##########
@@ -104,13 +118,40 @@ public enum ResultPartitionType {
      */
     private final boolean isReconnectable;
 
+    private final ConsumeType consumeType;
+
+    private final ReleaseType releaseType;
+
+    /** ConsumeType indicates when can the downstream consume the upstream. */
+    private enum ConsumeType {
+        /** Upstream must be finished before downstream consume. */
+        BLOCKING,
+        /** Downstream can consume while upstream is running. */
+        CAN_BE_PIPELINED,
+        /** Downstream must consume while upstream is running. */
+        MUST_BE_PIPELINED
+    }
+
+    /** ReleaseType indicates who is responsible for releasing the result 
partition. */
+    private enum ReleaseType {
+        RELEASE_BY_UPSTREAM,
+        RELEASE_BY_SCHEDULER
+    }

Review Comment:
   We may also want to add a comment about unifying how result partition should 
be released as a future work. Let's create a JIRA ticket (if there isn't one 
yet) and refers to it.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java:
##########
@@ -104,13 +118,40 @@ public enum ResultPartitionType {
      */
     private final boolean isReconnectable;
 
+    private final ConsumeType consumeType;
+
+    private final ReleaseType releaseType;
+
+    /** ConsumeType indicates when can the downstream consume the upstream. */
+    private enum ConsumeType {
+        /** Upstream must be finished before downstream consume. */
+        BLOCKING,
+        /** Downstream can consume while upstream is running. */
+        CAN_BE_PIPELINED,
+        /** Downstream must consume while upstream is running. */
+        MUST_BE_PIPELINED
+    }
+
+    /** ReleaseType indicates who is responsible for releasing the result 
partition. */
+    private enum ReleaseType {
+        RELEASE_BY_UPSTREAM,
+        RELEASE_BY_SCHEDULER
+    }

Review Comment:
   I'd suggest the following:
   ```
   private enum ReleaseBy {
       UPSTREAM,
       SCHEDULER
   }
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.java:
##########
@@ -70,10 +70,8 @@ public void startTrackingPartition(
         Preconditions.checkNotNull(producingTaskExecutorId);
         Preconditions.checkNotNull(resultPartitionDeploymentDescriptor);
 
-        // blocking and PIPELINED_APPROXIMATE partitions require explicit 
partition release calls
-        // reconnectable will be removed after FLINK-19895, see also {@link
-        // ResultPartitionType#isReconnectable}.
-        if 
(!resultPartitionDeploymentDescriptor.getPartitionType().isReconnectable()) {
+        // non-releaseByScheduler partitions don't require explicit partition 
release calls.
+        if 
(!resultPartitionDeploymentDescriptor.getPartitionType().isReleaseByScheduler())
 {

Review Comment:
   There's an assumption that `!isReleasedByScheduler` means released by 
upstream. This is not safe, because `ReleaseType` is an enum, which happens to 
have 2 values now and could have more in future. I'd suggest to introduce 
another method `isReleasedByUpstream()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to