xintongsong commented on code in PR #21560:
URL: https://github.com/apache/flink/pull/21560#discussion_r1062282342
##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java:
##########
@@ -412,4 +430,28 @@ public static PartitionLocationConstraint
fromJobType(JobType jobType) {
}
}
}
+
+ /**
+ * This class represents the shuffle descriptor with it index in {@link
ConsumedPartitionGroup}.
+ */
+ public static class ShuffleDescriptorAndIndex implements Serializable {
+ private static final long serialVersionUID = 852181945034989215L;
Review Comment:
Serial version uid should start with `1L`.
##########
flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java:
##########
@@ -650,24 +650,46 @@ public enum SchedulerType {
.withDescription(
"The JobManager's ResourceID. If not configured,
the ResourceID will be generated randomly.");
+ /** Type of consume partition mode of hybrid edge. */
+ public enum HybridConsumePartitionMode {
+ ONLY_CONSUME_ALL_FINISHED(true),
+ CAN_CONSUME_PARTIAL_FINISHED(true),
+ CAN_CONSUME_UN_FINISHED(false);
+
+ private final boolean onlyConsumeFinishedPartition;
+
+ HybridConsumePartitionMode(boolean onlyConsumeFinishedPartition) {
+ this.onlyConsumeFinishedPartition = onlyConsumeFinishedPartition;
+ }
+
+ public boolean isOnlyConsumeFinishedPartition() {
+ return onlyConsumeFinishedPartition;
+ }
+ }
+
@Documentation.Section({
Documentation.Sections.EXPERT_SCHEDULING,
Documentation.Sections.ALL_JOB_MANAGER
})
- public static final ConfigOption<Boolean> ONLY_CONSUME_FINISHED_PARTITION =
- key("jobmanager.partition.hybrid.only-consume-finished-partition")
- .booleanType()
+ public static final ConfigOption<HybridConsumePartitionMode>
HYBRID_CONSUME_PARTITION_MODE =
+ key("jobmanager.partition.hybrid.consume-partition-mode")
+ .enumType(HybridConsumePartitionMode.class)
.noDefaultValue()
.withDescription(
Description.builder()
.text(
- "Controls whether the scheduler
only allows downstream task consume finished partition. "
- + "Note that this option
is allowed only when %s has been set to %s, "
- + "and if you also enable
speculative execution(%s has been set to true),"
- + "this option can only be
set to true.",
+ "Controls how the downstream task
consumes hybrid type producer edge. "
+ + "Note that this option
is allowed only when %s has been set to %s. "
+ + "Accepted values are:",
code(SCHEDULER.key()),
-
code(SchedulerType.AdaptiveBatch.name()),
- code(SPECULATIVE_ENABLED.key()))
+
code(SchedulerType.AdaptiveBatch.name()))
+ .list(
+ text(
+
"'ONLY_CONSUME_ALL_FINISHED': Downstream tasks can consume data only when all
producers are finished."),
Review Comment:
Use `HybridConsumePartitionMode.CAN_CONSUME_PARTIAL_FINISHED.name()`.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptors.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.deployment;
+
+import
org.apache.flink.runtime.deployment.TaskDeploymentDescriptor.MaybeOffloaded;
+import
org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory.ShuffleDescriptorAndIndex;
+import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
+import org.apache.flink.util.function.FunctionWithException;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** {@link ShuffleDescriptor}s cache for a {@link ConsumedPartitionGroup}. */
+public class CachedShuffleDescriptors {
+ /**
+ * Stores all serialized shuffle descriptors with indexes. For a
partition, it may be added a
+ * serialized unknown shuffle descriptor to this list first, and then
added the real descriptor
+ * later.
+ */
+ private final List<MaybeOffloaded<ShuffleDescriptorAndIndex[]>>
serializedShuffleDescriptors;
+
+ /**
+ * Stores all to be serialized shuffle descriptors, They will be
serialized and added to
+ * serializedShuffleDescriptors during the next time
TaskDeploymentDescriptor is generated.
+ */
+ private final Queue<ShuffleDescriptorAndIndex> toBeSerialized;
+
+ /** Stores the mapping of resultPartitionId to index subscripts in
consumed partition group. */
+ private final Map<IntermediateResultPartitionID, Integer>
resultPartitionIdToIndex;
+
+ public CachedShuffleDescriptors(
+ ConsumedPartitionGroup consumedPartitionGroup,
+ MaybeOffloaded<ShuffleDescriptorAndIndex[]>
initialSerializedShuffleDescriptors) {
+ this.resultPartitionIdToIndex = new HashMap<>();
+ int index = 0;
+ for (IntermediateResultPartitionID resultPartitionID :
consumedPartitionGroup) {
+ resultPartitionIdToIndex.put(resultPartitionID, index++);
+ }
+ this.toBeSerialized = new ArrayDeque<>(consumedPartitionGroup.size());
+ this.serializedShuffleDescriptors = new ArrayList<>();
+
this.serializedShuffleDescriptors.add(initialSerializedShuffleDescriptors);
+ }
+
+ public MaybeOffloaded<ShuffleDescriptorAndIndex[]>
getShuffleDescriptors(int index) {
Review Comment:
The concept `index` is confusing, because the same term has different
semantics in `resultPartitionIdToIndex` and here.
##########
flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java:
##########
@@ -650,24 +650,46 @@ public enum SchedulerType {
.withDescription(
"The JobManager's ResourceID. If not configured,
the ResourceID will be generated randomly.");
+ /** Type of consume partition mode of hybrid edge. */
+ public enum HybridConsumePartitionMode {
+ ONLY_CONSUME_ALL_FINISHED(true),
+ CAN_CONSUME_PARTIAL_FINISHED(true),
+ CAN_CONSUME_UN_FINISHED(false);
Review Comment:
```
public enum HybridPartitionDataConsumeConstraint {
ALL_PRODUCERS_FINISHED,
ONLY_FINISHED_PRODUCERS,
UNFINISHED_PRODUCERS
}
```
##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptors.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.deployment;
+
+import
org.apache.flink.runtime.deployment.TaskDeploymentDescriptor.MaybeOffloaded;
+import
org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory.ShuffleDescriptorAndIndex;
+import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
+import org.apache.flink.util.function.FunctionWithException;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** {@link ShuffleDescriptor}s cache for a {@link ConsumedPartitionGroup}. */
+public class CachedShuffleDescriptors {
+ /**
+ * Stores all serialized shuffle descriptors with indexes. For a
partition, it may be added a
+ * serialized unknown shuffle descriptor to this list first, and then
added the real descriptor
+ * later.
+ */
+ private final List<MaybeOffloaded<ShuffleDescriptorAndIndex[]>>
serializedShuffleDescriptors;
+
+ /**
+ * Stores all to be serialized shuffle descriptors, They will be
serialized and added to
+ * serializedShuffleDescriptors during the next time
TaskDeploymentDescriptor is generated.
+ */
+ private final Queue<ShuffleDescriptorAndIndex> toBeSerialized;
+
+ /** Stores the mapping of resultPartitionId to index subscripts in
consumed partition group. */
+ private final Map<IntermediateResultPartitionID, Integer>
resultPartitionIdToIndex;
+
+ public CachedShuffleDescriptors(
+ ConsumedPartitionGroup consumedPartitionGroup,
+ MaybeOffloaded<ShuffleDescriptorAndIndex[]>
initialSerializedShuffleDescriptors) {
Review Comment:
I wonder if we can pass in unserialized shuffle descriptors and serialize
them inside the cache. Currently, we have the descriptors serialized both
outside and inside the cache, which is a bit duplicated.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptors.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.deployment;
+
+import
org.apache.flink.runtime.deployment.TaskDeploymentDescriptor.MaybeOffloaded;
+import
org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory.ShuffleDescriptorAndIndex;
+import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
+import org.apache.flink.util.function.FunctionWithException;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** {@link ShuffleDescriptor}s cache for a {@link ConsumedPartitionGroup}. */
+public class CachedShuffleDescriptors {
+ /**
+ * Stores all serialized shuffle descriptors with indexes. For a
partition, it may be added a
+ * serialized unknown shuffle descriptor to this list first, and then
added the real descriptor
+ * later.
+ */
+ private final List<MaybeOffloaded<ShuffleDescriptorAndIndex[]>>
serializedShuffleDescriptors;
+
+ /**
+ * Stores all to be serialized shuffle descriptors, They will be
serialized and added to
+ * serializedShuffleDescriptors during the next time
TaskDeploymentDescriptor is generated.
+ */
+ private final Queue<ShuffleDescriptorAndIndex> toBeSerialized;
+
+ /** Stores the mapping of resultPartitionId to index subscripts in
consumed partition group. */
+ private final Map<IntermediateResultPartitionID, Integer>
resultPartitionIdToIndex;
+
+ public CachedShuffleDescriptors(
+ ConsumedPartitionGroup consumedPartitionGroup,
+ MaybeOffloaded<ShuffleDescriptorAndIndex[]>
initialSerializedShuffleDescriptors) {
+ this.resultPartitionIdToIndex = new HashMap<>();
+ int index = 0;
+ for (IntermediateResultPartitionID resultPartitionID :
consumedPartitionGroup) {
+ resultPartitionIdToIndex.put(resultPartitionID, index++);
+ }
+ this.toBeSerialized = new ArrayDeque<>(consumedPartitionGroup.size());
+ this.serializedShuffleDescriptors = new ArrayList<>();
+
this.serializedShuffleDescriptors.add(initialSerializedShuffleDescriptors);
+ }
+
+ public MaybeOffloaded<ShuffleDescriptorAndIndex[]>
getShuffleDescriptors(int index) {
Review Comment:
I think we don't need `getShuffleDescriptors(int index)` and
`getSerializedShuffleDescriptorsSize()` at all. They are only used in a
for-loop in `IntermediateResult`, where we can simply use
`getAllSerializedShuffleDescriptors()`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]