xintongsong commented on a change in pull request #16307:
URL: https://github.com/apache/flink/pull/16307#discussion_r662020388



##########
File path: 
flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
##########
@@ -359,7 +359,7 @@ public Builder setTaskOffHeapMemory(MemorySize 
taskOffHeapMemory) {
             return this;
         }
 
-        public Builder setOffTaskHeapMemoryMB(int taskOffHeapMemoryMB) {
+        public Builder setTaskOffHeapMemoryMB(int taskOffHeapMemoryMB) {

Review comment:
       nit: in commit message
   type -> typo

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/common/operators/SlotSharingGroup.java
##########
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.operators;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.resources.CPUResource;
+import org.apache.flink.configuration.MemorySize;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Describe the name and the the different resource components of a slot 
sharing group. */
+@PublicEvolving
+public class SlotSharingGroup implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final String name;
+
+    /** How many cpu cores are needed. Can be null only if it is unknown. */
+    @Nullable // can be null only for UNKNOWN
+    private final CPUResource cpuCores;
+
+    /** How much task heap memory is needed. */
+    @Nullable // can be null only for UNKNOWN
+    private final MemorySize taskHeapMemory;
+
+    /** How much task off-heap memory is needed. */
+    @Nullable // can be null only for UNKNOWN
+    private final MemorySize taskOffHeapMemory;
+
+    /** How much managed memory is needed. */
+    @Nullable // can be null only for UNKNOWN
+    private final MemorySize managedMemory;
+
+    /** A extensible field for user specified resources from {@link 
ResourceSpec}. */
+    private final Map<String, Double> externalResources = new HashMap<>();
+
+    private SlotSharingGroup(
+            String name,
+            CPUResource cpuCores,
+            MemorySize taskHeapMemory,
+            MemorySize taskOffHeapMemory,
+            MemorySize managedMemory,
+            Map<String, Double> extendedResources) {
+        this.name = checkNotNull(name);
+        this.cpuCores = checkNotNull(cpuCores);
+        this.taskHeapMemory = checkNotNull(taskHeapMemory);
+        this.taskOffHeapMemory = checkNotNull(taskOffHeapMemory);
+        this.managedMemory = checkNotNull(managedMemory);
+        this.externalResources.putAll(checkNotNull(extendedResources));
+    }
+
+    private SlotSharingGroup(String name) {
+        this.name = checkNotNull(name);
+        this.cpuCores = null;
+        this.taskHeapMemory = null;
+        this.taskOffHeapMemory = null;
+        this.managedMemory = null;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public Optional<MemorySize> getManagedMemory() {
+        return Optional.ofNullable(managedMemory);
+    }
+
+    public Optional<MemorySize> getTaskHeapMemory() {
+        return Optional.ofNullable(taskHeapMemory);
+    }
+
+    public Optional<MemorySize> getTaskOffHeapMemory() {
+        return Optional.ofNullable(taskOffHeapMemory);
+    }
+
+    public Optional<Double> getCpuCores() {
+        return Optional.ofNullable(cpuCores)
+                .map(cpuResource -> cpuResource.getValue().doubleValue());
+    }
+
+    public Map<String, Double> getExternalResources() {
+        return Collections.unmodifiableMap(externalResources);
+    }
+
+    public static Builder newBuilder(String name) {
+        return new Builder(name);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj == this) {
+            return true;
+        } else if (obj != null && obj.getClass() == SlotSharingGroup.class) {
+            SlotSharingGroup that = (SlotSharingGroup) obj;
+            return Objects.equals(this.cpuCores, that.cpuCores)
+                    && Objects.equals(taskHeapMemory, that.taskHeapMemory)
+                    && Objects.equals(taskOffHeapMemory, 
that.taskOffHeapMemory)
+                    && Objects.equals(managedMemory, that.managedMemory)
+                    && Objects.equals(externalResources, 
that.externalResources);
+        }
+        return false;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = Objects.hashCode(cpuCores);
+        result = 31 * result + Objects.hashCode(taskHeapMemory);
+        result = 31 * result + Objects.hashCode(taskOffHeapMemory);
+        result = 31 * result + Objects.hashCode(managedMemory);
+        result = 31 * result + externalResources.hashCode();
+        return result;
+    }
+
+    /** Builder for the {@link ResourceSpec}. */
+    public static class Builder {
+
+        private String name;
+        private CPUResource cpuCores;
+        private MemorySize taskHeapMemory;
+        private MemorySize taskOffHeapMemory;
+        private MemorySize managedMemory;
+        private Map<String, Double> externalResources = new HashMap<>();
+
+        private Builder(String name) {
+            this.name = name;
+        }
+
+        /** Set the CPU cores for this SlotSharingGroup. */
+        public Builder setCpuCores(double cpuCores) {
+            checkArgument(cpuCores > 0, "The cpu cores should be positive.");
+            this.cpuCores = new CPUResource(cpuCores);
+            return this;
+        }
+
+        /** Set the task heap memory for this SlotSharingGroup. */
+        public Builder setTaskHeapMemory(MemorySize taskHeapMemory) {
+            checkArgument(
+                    taskHeapMemory.compareTo(MemorySize.ZERO) > 0,
+                    "The task heap memory should be positive.");
+            this.taskHeapMemory = taskHeapMemory;
+            return this;
+        }
+
+        /** Set the task heap memory for this SlotSharingGroup in MB. */
+        public Builder setTaskHeapMemoryMB(int taskHeapMemoryMB) {
+            checkArgument(taskHeapMemoryMB > 0, "The task heap memory should 
be positive.");
+            this.taskHeapMemory = MemorySize.ofMebiBytes(taskHeapMemoryMB);
+            return this;
+        }
+
+        /** Set the task off-heap memory for this SlotSharingGroup. */
+        public Builder setTaskOffHeapMemory(MemorySize taskOffHeapMemory) {
+            this.taskOffHeapMemory = taskOffHeapMemory;
+            return this;
+        }
+
+        /** Set the task off-heap memory for this SlotSharingGroup in MB. */
+        public Builder setTaskOffHeapMemoryMB(int taskOffHeapMemoryMB) {
+            this.taskOffHeapMemory = 
MemorySize.ofMebiBytes(taskOffHeapMemoryMB);
+            return this;
+        }
+
+        /** Set the task managed memory for this SlotSharingGroup. */
+        public Builder setManagedMemory(MemorySize managedMemory) {
+            this.managedMemory = managedMemory;
+            return this;
+        }
+
+        /** Set the task managed memory for this SlotSharingGroup in MB. */
+        public Builder setManagedMemoryMB(int managedMemoryMB) {
+            this.managedMemory = MemorySize.ofMebiBytes(managedMemoryMB);
+            return this;
+        }
+
+        /**
+         * Add the given external resource. The old value with the same 
resource name will be
+         * replaced if present.
+         */
+        public Builder setExternalResource(String name, double value) {
+            this.externalResources.put(name, value);
+            return this;
+        }
+
+        /** Build the SlotSharingGroup. */
+        public SlotSharingGroup build() {
+            if (cpuCores != null && taskHeapMemory != null) {
+                taskOffHeapMemory = 
Optional.ofNullable(taskOffHeapMemory).orElse(MemorySize.ZERO);
+                managedMemory = 
Optional.ofNullable(managedMemory).orElse(MemorySize.ZERO);
+                return new SlotSharingGroup(
+                        name,
+                        cpuCores,
+                        taskHeapMemory,
+                        taskOffHeapMemory,
+                        managedMemory,
+                        externalResources);
+            } else if (cpuCores != null
+                    || taskHeapMemory != null
+                    || taskOffHeapMemory != null
+                    || managedMemory != null
+                    || !externalResources.isEmpty()) {
+                throw new IllegalStateException(
+                        "The cpu cores and task heap memory are required when 
specific the resource of a slot sharing group. "
+                                + "You need to explicitly configure them with 
positive value.");

Review comment:
       1. `IllegalArgumentException` might be more suitable
   2. "when specific" -> "when specifying"

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/common/operators/util/SlotSharingGroupUtils.java
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.operators.util;
+
+import org.apache.flink.api.common.operators.ResourceSpec;
+import org.apache.flink.api.common.operators.SlotSharingGroup;
+import org.apache.flink.api.common.resources.ExternalResource;
+
+import java.util.stream.Collectors;
+
+/** Utils for {@link SlotSharingGroup}. */
+public class SlotSharingGroupUtils {
+    public static ResourceSpec toResourceSpec(SlotSharingGroup 
slotSharingGroup) {

Review comment:
       I'd suggest to name the method `extractResourceSpec`.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
##########
@@ -2092,6 +2094,27 @@ public StreamGraph getStreamGraph(String jobName) {
     @Internal
     public StreamGraph getStreamGraph(String jobName, boolean 
clearTransformations) {
         StreamGraph streamGraph = 
getStreamGraphGenerator().setJobName(jobName).generate();
+
+        // There might be a resource deadlock when applying fine-grained 
resource management in
+        // batch jobs with PIPELINE edges. Users need to trigger the
+        // fine-grained.shuffle-mode.all-blocking to convert all edges to 
BLOCKING before we fix
+        // that issue.
+        if (configuration.get(ExecutionOptions.RUNTIME_MODE) == 
RuntimeExecutionMode.BATCH
+                && !slotSharingGroupResources.isEmpty()) {
+            if 
(configuration.get(ClusterOptions.FINE_GRAINED_SHUFFLE_MODE_ALL_BLOCKING)) {
+                
streamGraph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_BLOCKING);
+            } else {
+                throw new IllegalStateException(
+                        "As described in FLINK-20865, there might be a 
resource deadlock when applying "
+                                + "fine-grained resource management in batch 
jobs with PIPELINE edges. "
+                                + "As a temporary workaround, please configure 
the "
+                                + 
ClusterOptions.FINE_GRAINED_SHUFFLE_MODE_ALL_BLOCKING.key()
+                                + " to true, with which Flink will convert 
those PIPELINE edges to BLOCKING. "
+                                + "Please not that data will be fully produced 
before sent to consumer tasks "
+                                + "for BLOCKING edges, that might affect the 
performance.");
+            }

Review comment:
       1. `IllegalStateException` -> `IllegalConfigurationException`
   2. Not sure if we want to explain as detail as how blocking edges work. The 
error message should probably focus on "what is wrong" and "how to fix it".
   
   > At the moment, fine-grained resource management requires batch workloads 
to be executed with types of all edges being BLOCKING. To do that, you need to 
configure '<config-key>' to 'true'. Notice that this may affect the 
performance. See FLINK-20865 for more details.
   

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/common/operators/util/SlotSharingGroupUtils.java
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.operators.util;
+
+import org.apache.flink.api.common.operators.ResourceSpec;
+import org.apache.flink.api.common.operators.SlotSharingGroup;
+import org.apache.flink.api.common.resources.ExternalResource;
+
+import java.util.stream.Collectors;
+
+/** Utils for {@link SlotSharingGroup}. */
+public class SlotSharingGroupUtils {
+    public static ResourceSpec toResourceSpec(SlotSharingGroup 
slotSharingGroup) {
+        if (!slotSharingGroup.getCpuCores().isPresent()) {
+            return ResourceSpec.UNKNOWN;
+        }
+
+        return ResourceSpec.newBuilder(slotSharingGroup.getCpuCores().get(), 0)
+                .setTaskHeapMemory(slotSharingGroup.getTaskHeapMemory().get())
+                
.setTaskOffHeapMemory(slotSharingGroup.getTaskOffHeapMemory().get())
+                .setManagedMemory(slotSharingGroup.getManagedMemory().get())

Review comment:
       IDE complains about calling `get()` without checking existence. We can 
use some assertions.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to