xintongsong commented on code in PR #21560:
URL: https://github.com/apache/flink/pull/21560#discussion_r1062282342


##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java:
##########
@@ -412,4 +430,28 @@ public static PartitionLocationConstraint 
fromJobType(JobType jobType) {
             }
         }
     }
+
+    /**
+     * This class represents the shuffle descriptor with it index in {@link 
ConsumedPartitionGroup}.
+     */
+    public static class ShuffleDescriptorAndIndex implements Serializable {
+        private static final long serialVersionUID = 852181945034989215L;

Review Comment:
   Serial version uid should start with `1L`.



##########
flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java:
##########
@@ -650,24 +650,46 @@ public enum SchedulerType {
                     .withDescription(
                             "The JobManager's ResourceID. If not configured, 
the ResourceID will be generated randomly.");
 
+    /** Type of consume partition mode of hybrid edge. */
+    public enum HybridConsumePartitionMode {
+        ONLY_CONSUME_ALL_FINISHED(true),
+        CAN_CONSUME_PARTIAL_FINISHED(true),
+        CAN_CONSUME_UN_FINISHED(false);
+
+        private final boolean onlyConsumeFinishedPartition;
+
+        HybridConsumePartitionMode(boolean onlyConsumeFinishedPartition) {
+            this.onlyConsumeFinishedPartition = onlyConsumeFinishedPartition;
+        }
+
+        public boolean isOnlyConsumeFinishedPartition() {
+            return onlyConsumeFinishedPartition;
+        }
+    }
+
     @Documentation.Section({
         Documentation.Sections.EXPERT_SCHEDULING,
         Documentation.Sections.ALL_JOB_MANAGER
     })
-    public static final ConfigOption<Boolean> ONLY_CONSUME_FINISHED_PARTITION =
-            key("jobmanager.partition.hybrid.only-consume-finished-partition")
-                    .booleanType()
+    public static final ConfigOption<HybridConsumePartitionMode> 
HYBRID_CONSUME_PARTITION_MODE =
+            key("jobmanager.partition.hybrid.consume-partition-mode")
+                    .enumType(HybridConsumePartitionMode.class)
                     .noDefaultValue()
                     .withDescription(
                             Description.builder()
                                     .text(
-                                            "Controls whether the scheduler 
only allows downstream task consume finished partition. "
-                                                    + "Note that this option 
is allowed only when %s has been set to %s, "
-                                                    + "and if you also enable 
speculative execution(%s has been set to true),"
-                                                    + "this option can only be 
set to true.",
+                                            "Controls how the downstream task 
consumes hybrid type producer edge. "
+                                                    + "Note that this option 
is allowed only when %s has been set to %s. "
+                                                    + "Accepted values are:",
                                             code(SCHEDULER.key()),
-                                            
code(SchedulerType.AdaptiveBatch.name()),
-                                            code(SPECULATIVE_ENABLED.key()))
+                                            
code(SchedulerType.AdaptiveBatch.name()))
+                                    .list(
+                                            text(
+                                                    
"'ONLY_CONSUME_ALL_FINISHED': Downstream tasks can consume data only when all 
producers are finished."),

Review Comment:
   Use `HybridConsumePartitionMode.CAN_CONSUME_PARTIAL_FINISHED.name()`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptors.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.deployment;
+
+import 
org.apache.flink.runtime.deployment.TaskDeploymentDescriptor.MaybeOffloaded;
+import 
org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory.ShuffleDescriptorAndIndex;
+import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
+import org.apache.flink.util.function.FunctionWithException;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** {@link ShuffleDescriptor}s cache for a {@link ConsumedPartitionGroup}. */
+public class CachedShuffleDescriptors {
+    /**
+     * Stores all serialized shuffle descriptors with indexes. For a 
partition, it may be added a
+     * serialized unknown shuffle descriptor to this list first, and then 
added the real descriptor
+     * later.
+     */
+    private final List<MaybeOffloaded<ShuffleDescriptorAndIndex[]>> 
serializedShuffleDescriptors;
+
+    /**
+     * Stores all to be serialized shuffle descriptors, They will be 
serialized and added to
+     * serializedShuffleDescriptors during the next time 
TaskDeploymentDescriptor is generated.
+     */
+    private final Queue<ShuffleDescriptorAndIndex> toBeSerialized;
+
+    /** Stores the mapping of resultPartitionId to index subscripts in 
consumed partition group. */
+    private final Map<IntermediateResultPartitionID, Integer> 
resultPartitionIdToIndex;
+
+    public CachedShuffleDescriptors(
+            ConsumedPartitionGroup consumedPartitionGroup,
+            MaybeOffloaded<ShuffleDescriptorAndIndex[]> 
initialSerializedShuffleDescriptors) {
+        this.resultPartitionIdToIndex = new HashMap<>();
+        int index = 0;
+        for (IntermediateResultPartitionID resultPartitionID : 
consumedPartitionGroup) {
+            resultPartitionIdToIndex.put(resultPartitionID, index++);
+        }
+        this.toBeSerialized = new ArrayDeque<>(consumedPartitionGroup.size());
+        this.serializedShuffleDescriptors = new ArrayList<>();
+        
this.serializedShuffleDescriptors.add(initialSerializedShuffleDescriptors);
+    }
+
+    public MaybeOffloaded<ShuffleDescriptorAndIndex[]> 
getShuffleDescriptors(int index) {

Review Comment:
   The concept `index` is confusing, because the same term has different 
semantics in `resultPartitionIdToIndex` and here.



##########
flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java:
##########
@@ -650,24 +650,46 @@ public enum SchedulerType {
                     .withDescription(
                             "The JobManager's ResourceID. If not configured, 
the ResourceID will be generated randomly.");
 
+    /** Type of consume partition mode of hybrid edge. */
+    public enum HybridConsumePartitionMode {
+        ONLY_CONSUME_ALL_FINISHED(true),
+        CAN_CONSUME_PARTIAL_FINISHED(true),
+        CAN_CONSUME_UN_FINISHED(false);

Review Comment:
   ```
   public enum HybridPartitionDataConsumeConstraint {
       ALL_PRODUCERS_FINISHED,
       ONLY_FINISHED_PRODUCERS,
       UNFINISHED_PRODUCERS
   }
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptors.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.deployment;
+
+import 
org.apache.flink.runtime.deployment.TaskDeploymentDescriptor.MaybeOffloaded;
+import 
org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory.ShuffleDescriptorAndIndex;
+import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
+import org.apache.flink.util.function.FunctionWithException;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** {@link ShuffleDescriptor}s cache for a {@link ConsumedPartitionGroup}. */
+public class CachedShuffleDescriptors {
+    /**
+     * Stores all serialized shuffle descriptors with indexes. For a 
partition, it may be added a
+     * serialized unknown shuffle descriptor to this list first, and then 
added the real descriptor
+     * later.
+     */
+    private final List<MaybeOffloaded<ShuffleDescriptorAndIndex[]>> 
serializedShuffleDescriptors;
+
+    /**
+     * Stores all to be serialized shuffle descriptors, They will be 
serialized and added to
+     * serializedShuffleDescriptors during the next time 
TaskDeploymentDescriptor is generated.
+     */
+    private final Queue<ShuffleDescriptorAndIndex> toBeSerialized;
+
+    /** Stores the mapping of resultPartitionId to index subscripts in 
consumed partition group. */
+    private final Map<IntermediateResultPartitionID, Integer> 
resultPartitionIdToIndex;
+
+    public CachedShuffleDescriptors(
+            ConsumedPartitionGroup consumedPartitionGroup,
+            MaybeOffloaded<ShuffleDescriptorAndIndex[]> 
initialSerializedShuffleDescriptors) {

Review Comment:
   I wonder if we can pass in unserialized shuffle descriptors and serialize 
them inside the cache. Currently, we have the descriptors serialized both 
outside and inside the cache, which is a bit duplicated.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptors.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.deployment;
+
+import 
org.apache.flink.runtime.deployment.TaskDeploymentDescriptor.MaybeOffloaded;
+import 
org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory.ShuffleDescriptorAndIndex;
+import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
+import org.apache.flink.util.function.FunctionWithException;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** {@link ShuffleDescriptor}s cache for a {@link ConsumedPartitionGroup}. */
+public class CachedShuffleDescriptors {
+    /**
+     * Stores all serialized shuffle descriptors with indexes. For a 
partition, it may be added a
+     * serialized unknown shuffle descriptor to this list first, and then 
added the real descriptor
+     * later.
+     */
+    private final List<MaybeOffloaded<ShuffleDescriptorAndIndex[]>> 
serializedShuffleDescriptors;
+
+    /**
+     * Stores all to be serialized shuffle descriptors, They will be 
serialized and added to
+     * serializedShuffleDescriptors during the next time 
TaskDeploymentDescriptor is generated.
+     */
+    private final Queue<ShuffleDescriptorAndIndex> toBeSerialized;
+
+    /** Stores the mapping of resultPartitionId to index subscripts in 
consumed partition group. */
+    private final Map<IntermediateResultPartitionID, Integer> 
resultPartitionIdToIndex;
+
+    public CachedShuffleDescriptors(
+            ConsumedPartitionGroup consumedPartitionGroup,
+            MaybeOffloaded<ShuffleDescriptorAndIndex[]> 
initialSerializedShuffleDescriptors) {
+        this.resultPartitionIdToIndex = new HashMap<>();
+        int index = 0;
+        for (IntermediateResultPartitionID resultPartitionID : 
consumedPartitionGroup) {
+            resultPartitionIdToIndex.put(resultPartitionID, index++);
+        }
+        this.toBeSerialized = new ArrayDeque<>(consumedPartitionGroup.size());
+        this.serializedShuffleDescriptors = new ArrayList<>();
+        
this.serializedShuffleDescriptors.add(initialSerializedShuffleDescriptors);
+    }
+
+    public MaybeOffloaded<ShuffleDescriptorAndIndex[]> 
getShuffleDescriptors(int index) {

Review Comment:
   I think we don't need `getShuffleDescriptors(int index)` and 
`getSerializedShuffleDescriptorsSize()` at all. They are only used in a 
for-loop in `IntermediateResult`, where we can simply use 
`getAllSerializedShuffleDescriptors()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to