zhuzhurk commented on a change in pull request #16307: URL: https://github.com/apache/flink/pull/16307#discussion_r662092566
########## File path: flink-core/src/main/java/org/apache/flink/api/common/operators/SlotSharingGroup.java ########## @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.operators; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.resources.CPUResource; +import org.apache.flink.configuration.MemorySize; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** Describe the name and the the different resource components of a slot sharing group. */ +@PublicEvolving +public class SlotSharingGroup implements Serializable { + private static final long serialVersionUID = 1L; + + private final String name; + + /** How many cpu cores are needed. Can be null only if it is unknown. */ + @Nullable // can be null only for UNKNOWN + private final CPUResource cpuCores; + + /** How much task heap memory is needed. */ + @Nullable // can be null only for UNKNOWN + private final MemorySize taskHeapMemory; + + /** How much task off-heap memory is needed. */ + @Nullable // can be null only for UNKNOWN + private final MemorySize taskOffHeapMemory; + + /** How much managed memory is needed. */ + @Nullable // can be null only for UNKNOWN + private final MemorySize managedMemory; + + /** A extensible field for user specified resources from {@link ResourceSpec}. */ Review comment: `ResourceSpec` -> `SlotSharingGroup` ########## File path: flink-core/src/main/java/org/apache/flink/api/common/operators/SlotSharingGroup.java ########## @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.operators; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.resources.CPUResource; +import org.apache.flink.configuration.MemorySize; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** Describe the name and the the different resource components of a slot sharing group. */ +@PublicEvolving +public class SlotSharingGroup implements Serializable { + private static final long serialVersionUID = 1L; + + private final String name; + + /** How many cpu cores are needed. Can be null only if it is unknown. */ + @Nullable // can be null only for UNKNOWN Review comment: Can we just have a `ResourceSpec` here and avoid adding variant resources? This can make things simpler(both in `SlotSharingGroup` and `SlotSharingGroupUtils`) and help to avoid inconsistency if changes are applied in `ResourceSpec`. ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java ########## @@ -452,6 +457,33 @@ private boolean isUnboundedSource(final Transformation<?> transformation) { } } + transform Review comment: Looks to me the `slotSharingGroupResources` can be different in cases that 1. a transformation without SlotSharingGroup set 2. above transformation is set with a "default" SlotSharingGroup However, theoretically they are the same case. ########## File path: flink-core/src/main/java/org/apache/flink/api/common/operators/SlotSharingGroup.java ########## @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.operators; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.resources.CPUResource; +import org.apache.flink.configuration.MemorySize; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** Describe the name and the the different resource components of a slot sharing group. */ +@PublicEvolving +public class SlotSharingGroup implements Serializable { + private static final long serialVersionUID = 1L; + + private final String name; + + /** How many cpu cores are needed. Can be null only if it is unknown. */ + @Nullable // can be null only for UNKNOWN + private final CPUResource cpuCores; + + /** How much task heap memory is needed. */ + @Nullable // can be null only for UNKNOWN + private final MemorySize taskHeapMemory; + + /** How much task off-heap memory is needed. */ + @Nullable // can be null only for UNKNOWN + private final MemorySize taskOffHeapMemory; + + /** How much managed memory is needed. */ + @Nullable // can be null only for UNKNOWN + private final MemorySize managedMemory; + + /** A extensible field for user specified resources from {@link ResourceSpec}. */ + private final Map<String, Double> externalResources = new HashMap<>(); + + private SlotSharingGroup( + String name, + CPUResource cpuCores, + MemorySize taskHeapMemory, + MemorySize taskOffHeapMemory, + MemorySize managedMemory, + Map<String, Double> extendedResources) { + this.name = checkNotNull(name); + this.cpuCores = checkNotNull(cpuCores); + this.taskHeapMemory = checkNotNull(taskHeapMemory); + this.taskOffHeapMemory = checkNotNull(taskOffHeapMemory); + this.managedMemory = checkNotNull(managedMemory); + this.externalResources.putAll(checkNotNull(extendedResources)); + } + + private SlotSharingGroup(String name) { + this.name = checkNotNull(name); + this.cpuCores = null; + this.taskHeapMemory = null; + this.taskOffHeapMemory = null; + this.managedMemory = null; + } + + public String getName() { + return name; + } + + public Optional<MemorySize> getManagedMemory() { + return Optional.ofNullable(managedMemory); + } + + public Optional<MemorySize> getTaskHeapMemory() { + return Optional.ofNullable(taskHeapMemory); + } + + public Optional<MemorySize> getTaskOffHeapMemory() { + return Optional.ofNullable(taskOffHeapMemory); + } + + public Optional<Double> getCpuCores() { + return Optional.ofNullable(cpuCores) + .map(cpuResource -> cpuResource.getValue().doubleValue()); + } + + public Map<String, Double> getExternalResources() { + return Collections.unmodifiableMap(externalResources); + } + + public static Builder newBuilder(String name) { + return new Builder(name); + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } else if (obj != null && obj.getClass() == SlotSharingGroup.class) { + SlotSharingGroup that = (SlotSharingGroup) obj; + return Objects.equals(this.cpuCores, that.cpuCores) + && Objects.equals(taskHeapMemory, that.taskHeapMemory) + && Objects.equals(taskOffHeapMemory, that.taskOffHeapMemory) + && Objects.equals(managedMemory, that.managedMemory) + && Objects.equals(externalResources, that.externalResources); + } + return false; + } + + @Override + public int hashCode() { + int result = Objects.hashCode(cpuCores); + result = 31 * result + Objects.hashCode(taskHeapMemory); + result = 31 * result + Objects.hashCode(taskOffHeapMemory); + result = 31 * result + Objects.hashCode(managedMemory); + result = 31 * result + externalResources.hashCode(); + return result; + } + + /** Builder for the {@link ResourceSpec}. */ Review comment: `ResourceSpec` -> `SlotSharingGroup` ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java ########## @@ -189,6 +195,8 @@ private final List<JobListener> jobListeners = new ArrayList<>(); + private final Map<String, ResourceProfile> slotSharingGroupResources = new HashMap<>(); Review comment: Maybe a "default" group with `UNKNOWN` resources should be registered by default? ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java ########## @@ -2093,6 +2096,24 @@ public StreamGraph getStreamGraph(String jobName) { @Internal public StreamGraph getStreamGraph(String jobName, boolean clearTransformations) { StreamGraph streamGraph = getStreamGraphGenerator().setJobName(jobName).generate(); + + // There might be a resource deadlock when applying fine-grained resource management in + // batch jobs with PIPELINE edges. Users need to trigger the + // fine-grained.shuffle-mode.all-blocking to convert all edges to BLOCKING before we fix + // that issue. + if (configuration.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.BATCH + && !slotSharingGroupResources.isEmpty()) { Review comment: It's possible that `slotSharingGroupResources` is not empty but only contains `UNKNOWN` resources. I think this kind of job should not be treated as fine-grained resource managed. ########## File path: flink-core/src/test/java/org/apache/flink/api/common/operators/SlotSharingGroupTest.java ########## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.operators; + +import org.apache.flink.configuration.MemorySize; + +import org.junit.Test; + +import java.util.Collections; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** Tests for {@link SlotSharingGroup}. */ +public class SlotSharingGroupTest { + @Test + public void testBuildSlotSharingGroup() { Review comment: I would separate this test into a `testBuildSlotSharingGroupWithSpecificResource` and a `testBuildSlotSharingGroupWithUnknownResource` ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java ########## @@ -312,6 +315,9 @@ public StreamGraph generate() { transform(transformation); } + // Set the slot sharing group resources Review comment: This comment is not needed I think ########## File path: flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java ########## @@ -405,10 +406,23 @@ public String getSlotSharingGroup() { * <p>Initially, an operation is in the default slot sharing group. This can be explicitly set * using {@code setSlotSharingGroup("default")}. * - * @param slotSharingGroup The slot sharing group name. + * @param slotSharingGroup The slot sharing group's name. */ public void setSlotSharingGroup(String slotSharingGroup) { Review comment: It's better to rename the param as `slotSharingGroupName` ########## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java ########## @@ -749,6 +750,46 @@ public void testSettingSavepointRestoreSettingsSetterOverrides() { equalTo(SavepointRestoreSettings.forPath("/tmp/savepoint1"))); } + @Test + public void testConfigureSlotSharingGroupResource() { + final SlotSharingGroup ssg1 = + SlotSharingGroup.newBuilder("ssg1").setCpuCores(1).setTaskHeapMemoryMB(100).build(); + final SlotSharingGroup ssg2 = + SlotSharingGroup.newBuilder("ssg2").setCpuCores(2).setTaskHeapMemoryMB(200).build(); + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + final DataStream<Integer> source = env.fromElements(1).slotSharingGroup("ssg1"); + source.map(value -> value) + .slotSharingGroup(ssg2) + .addSink(new DiscardingSink<>()) + .slotSharingGroup(ssg1); + + final StreamGraph streamGraph = env.getStreamGraph(); + assertThat( + streamGraph.getSlotSharingGroupResource("ssg1").get(), + is(ResourceProfile.fromResources(1, 100))); + assertThat( Review comment: Better to also include 1. a transformation without `SlotSharingGroup` set 2. a transformation with "default" `SlotSharingGroup` set -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
