xintongsong commented on a change in pull request #16307:
URL: https://github.com/apache/flink/pull/16307#discussion_r661252023
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
##########
@@ -452,6 +457,38 @@ private boolean isUnboundedSource(final Transformation<?>
transformation) {
}
}
+ transform
+ .getSlotSharingGroup()
+ .ifPresent(
+ slotSharingGroup -> {
+ if
(!slotSharingGroup.getResourceSpec().equals(ResourceSpec.UNKNOWN)) {
+ resourceFromTransformation.putIfAbsent(
+ slotSharingGroup.getName(),
+ slotSharingGroup.getResourceSpec());
+ if (!slotSharingGroup
+ .getResourceSpec()
+ .equals(
+ resourceFromTransformation.get(
+
slotSharingGroup.getName()))) {
+ throw new IllegalArgumentException(
+ "The slot sharing group "
+ +
slotSharingGroup.getName()
+ + " has been configured
with two different resource spec.");
+ } else {
+ if (!streamGraph
+
.getSlotSharingGroupResource(slotSharingGroup.getName())
+ .isPresent()) {
+
streamGraph.setSlotSharingGroupResource(
+ Collections.singletonMap(
+
slotSharingGroup.getName(),
+
ResourceProfile.fromResourceSpec(
+
slotSharingGroup.getResourceSpec(),
+
MemorySize.ZERO)));
+ }
+ }
Review comment:
IIUC, `streamGraph.getSlotSharingGroupResource` can overwrite
`resourceFromTransformation` if the same SSG is set with different resources,
which is against my intuition. I'd suggest to always fail explicitly whenever a
conflict is detected.
##########
File path:
flink-core/src/main/java/org/apache/flink/api/common/operators/SlotSharingGroup.java
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.operators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.resources.ExternalResource;
+import org.apache.flink.configuration.MemorySize;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Describe the name and the the different resource components of a slot
sharing group. */
+@PublicEvolving
+public class SlotSharingGroup implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final String name;
+
+ private final ResourceSpec resourceSpec;
+
+ private SlotSharingGroup(String name, ResourceSpec resourceSpec) {
+ this.name = checkNotNull(name);
+ this.resourceSpec = checkNotNull(resourceSpec);
+ }
+
+ @Internal
+ public SlotSharingGroup(String name) {
+ this.name = checkNotNull(name);
+ this.resourceSpec = ResourceSpec.UNKNOWN;
+ }
+
+ @Internal
+ public String getName() {
+ return name;
+ }
+
+ @Internal
+ public ResourceSpec getResourceSpec() {
+ return resourceSpec;
+ }
Review comment:
Why are these `@Internal`? If these methods are not meant to be used by
users, I would suggest to not have them in this API class at all, to avoid
providing chances for users to make mistakes. E.g., if we don't want to expose
`ResourceSpec` to users, we can have an internal util that create
`ResourceSpec` from a `SlotSharingGroup`.
##########
File path:
flink-core/src/main/java/org/apache/flink/api/common/operators/SlotSharingGroup.java
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.operators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.resources.ExternalResource;
+import org.apache.flink.configuration.MemorySize;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Describe the name and the the different resource components of a slot
sharing group. */
+@PublicEvolving
+public class SlotSharingGroup implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final String name;
+
+ private final ResourceSpec resourceSpec;
+
+ private SlotSharingGroup(String name, ResourceSpec resourceSpec) {
+ this.name = checkNotNull(name);
+ this.resourceSpec = checkNotNull(resourceSpec);
+ }
+
+ @Internal
+ public SlotSharingGroup(String name) {
+ this.name = checkNotNull(name);
+ this.resourceSpec = ResourceSpec.UNKNOWN;
+ }
+
+ @Internal
+ public String getName() {
+ return name;
+ }
+
+ @Internal
+ public ResourceSpec getResourceSpec() {
+ return resourceSpec;
+ }
+
+ public static Builder newBuilder(String name) {
+ return new Builder(name);
+ }
+
+ /** Builder for the {@link ResourceSpec}. */
+ public static class Builder {
+
+ private String name;
+ private final ResourceSpec.Builder resourceSpecBuilder =
ResourceSpec.newBuilder(0, 0);
Review comment:
1. One assumption of `ResourceSpec` is that, cpu & task-heap memory are
always explicitly provided, unless is `UNKNOWN`. That is because all tasks must
use cpu and task-heap memory. This builder currently does not ensure that.
2. If a slot sharing group is created with none of the resource fields set,
the default resource spec should be `UNKNOWN`, which gives it the default slot
resources. Currently, it will result in a slot with zero resources, if not an
error.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
##########
@@ -2090,6 +2092,25 @@ public StreamGraph getStreamGraph(String jobName) {
@Internal
public StreamGraph getStreamGraph(String jobName, boolean
clearTransformations) {
StreamGraph streamGraph =
getStreamGraphGenerator().setJobName(jobName).generate();
+
+ // There might be a resource deadlock when applying fine-grained
resource management in
+ // batch jobs with PIPELINE edges. Users need to trigger the
+ // fine-grained.shuffle-mode.all-blocking to convert all edges to
BLOCKING before we fix
+ // that issue in FLINK-20865.
+ if (configuration.get(ExecutionOptions.RUNTIME_MODE) ==
RuntimeExecutionMode.BATCH
+ && !slotSharingGroupResources.isEmpty()) {
+ if
(configuration.get(ClusterOptions.FINE_GRAINED_SHUFFLE_MODE_ALL_BLOCKING)) {
+
streamGraph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_BLOCKING);
+ } else {
+ throw new IllegalStateException(
+ "There might be a resource deadlock, which will be
fixed in FLINK-20865, when "
+ + "applying fine-grained resource management
in batch jobs with PIPELINE edges. "
+ + "As a temporary workaround, please configure
the "
+ +
ClusterOptions.FINE_GRAINED_SHUFFLE_MODE_ALL_BLOCKING.key()
+ + " to true, with which Flink will convert
those PIPELINE edges to BLOCKING.");
+ }
Review comment:
1. I would say "as described in FLINK-20865" rather than "will be fixed
in FLINK-20865". We do not know for sure that the problem will be fixed in
FLINK-20865, thus should not commit that to users.
2. It is important to state that changing from PIPELINE to BLOCKING edges
might affect the performance. Since `FINE_GRAINED_SHUFFLE_MODE_ALL_BLOCKING` is
`ExcludeFromDocumentation`, its description will not be seen by users.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]