jeyhunkarimov commented on code in PR #24541:
URL: https://github.com/apache/flink/pull/24541#discussion_r1583695410


##########
flink-core-api/src/main/java/org/apache/flink/api/common/operators/SlotSharingGroupDescriptor.java:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.operators;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.configuration.MemorySize;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * The descriptor that describe the name and the different resource components 
of a slot sharing
+ * group.
+ */
+@Experimental
+public class SlotSharingGroupDescriptor {
+    private final String name;
+
+    /** How many cpu cores are needed. Can be null only if it is unknown. */
+    @Nullable // can be null only for UNKNOWN
+    private final Double cpuCores;
+
+    /** How much task heap memory is needed. */
+    @Nullable // can be null only for UNKNOWN
+    private final MemorySize taskHeapMemory;
+
+    /** How much task off-heap memory is needed. */
+    @Nullable // can be null only for UNKNOWN
+    private final MemorySize taskOffHeapMemory;
+
+    /** How much managed memory is needed. */
+    @Nullable // can be null only for UNKNOWN
+    private final MemorySize managedMemory;
+
+    /** A extensible field for user specified resources from {@link 
SlotSharingGroupDescriptor}. */
+    private final Map<String, Double> externalResources = new HashMap<>();
+
+    private SlotSharingGroupDescriptor(
+            String name,
+            @Nullable Double cpuCores,
+            @Nullable MemorySize taskHeapMemory,
+            @Nullable MemorySize taskOffHeapMemory,
+            @Nullable MemorySize managedMemory,
+            @Nullable Map<String, Double> extendedResources) {
+        this.name = name;
+        this.cpuCores = cpuCores;
+        this.taskHeapMemory = taskHeapMemory;
+        this.taskOffHeapMemory = taskOffHeapMemory;
+        this.managedMemory = managedMemory;
+        this.externalResources.putAll(extendedResources);
+    }
+
+    private SlotSharingGroupDescriptor(String name) {
+        this.name = name;
+        this.cpuCores = null;
+        this.taskHeapMemory = null;
+        this.taskOffHeapMemory = null;
+        this.managedMemory = null;
+    }

Review Comment:
   Maybe utilize the the above constructor (e.g., `this(name, null, null, null, 
null)` ?



##########
flink-core-api/src/main/java/org/apache/flink/api/common/operators/SlotSharingGroupDescriptor.java:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.operators;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.configuration.MemorySize;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * The descriptor that describe the name and the different resource components 
of a slot sharing
+ * group.
+ */
+@Experimental
+public class SlotSharingGroupDescriptor {
+    private final String name;
+
+    /** How many cpu cores are needed. Can be null only if it is unknown. */
+    @Nullable // can be null only for UNKNOWN
+    private final Double cpuCores;
+
+    /** How much task heap memory is needed. */
+    @Nullable // can be null only for UNKNOWN
+    private final MemorySize taskHeapMemory;
+
+    /** How much task off-heap memory is needed. */
+    @Nullable // can be null only for UNKNOWN
+    private final MemorySize taskOffHeapMemory;
+
+    /** How much managed memory is needed. */
+    @Nullable // can be null only for UNKNOWN
+    private final MemorySize managedMemory;
+
+    /** A extensible field for user specified resources from {@link 
SlotSharingGroupDescriptor}. */
+    private final Map<String, Double> externalResources = new HashMap<>();

Review Comment:
   I think this map should be immutable. So, maybe initialize  it in 
constructor via `Collections.unmodifiableMap ` ?



##########
flink-core/src/main/java/org/apache/flink/api/common/operators/SlotSharingGroup.java:
##########
@@ -137,6 +137,37 @@ public int hashCode() {
         return result;
     }
 
+    /** Convert a {@link SlotSharingGroupDescriptor} to {@link 
SlotSharingGroup}. */
+    public static SlotSharingGroup fromDescriptor(SlotSharingGroupDescriptor 
descriptor) {
+        if (descriptor.getCpuCores() != null && descriptor.getTaskHeapMemory() 
!= null) {
+            MemorySize taskOffHeapMemory =
+                    descriptor.getTaskOffHeapMemory() == null
+                            ? MemorySize.ZERO
+                            : descriptor.getTaskOffHeapMemory();
+            MemorySize managedMemory =
+                    descriptor.getManagedMemory() == null
+                            ? MemorySize.ZERO
+                            : descriptor.getManagedMemory();
+            return new SlotSharingGroup(
+                    descriptor.getName(),
+                    new CPUResource(descriptor.getCpuCores()),
+                    descriptor.getTaskHeapMemory(),
+                    taskOffHeapMemory,
+                    managedMemory,
+                    descriptor.getExternalResources());
+        } else if (descriptor.getCpuCores() != null
+                || descriptor.getTaskHeapMemory() != null
+                || descriptor.getTaskOffHeapMemory() != null
+                || descriptor.getManagedMemory() != null
+                || !descriptor.getExternalResources().isEmpty()) {
+            throw new IllegalArgumentException(
+                    "The cpu cores and task heap memory are required when 
specifying the resource of a slot sharing group. "
+                            + "You need to explicitly configure them with 
positive value.");
+        } else {
+            return new SlotSharingGroup(descriptor.getName());
+        }

Review Comment:
   WDYT about simplifying a bit?
   
   ```
       Objects.requireNonNull(descriptor, "Descriptor cannot be null");
   
       Double cpuCores = descriptor.getCpuCores();
       MemorySize taskHeapMemory = descriptor.getTaskHeapMemory();
   
       if (cpuCores != null && cpuCores > 0 && taskHeapMemory != null && 
taskHeapMemory.compareTo(MemorySize.ZERO) > 0) {
           MemorySize taskOffHeapMemory = descriptor.getTaskOffHeapMemory() != 
null ? descriptor.getTaskOffHeapMemory() : MemorySize.ZERO;
           MemorySize managedMemory = descriptor.getManagedMemory() != null ? 
descriptor.getManagedMemory() : MemorySize.ZERO;
           return new SlotSharingGroup(
                   descriptor.getName(),
                   new CPUResource(cpuCores),
                   taskHeapMemory,
                   taskOffHeapMemory,
                   managedMemory,
                   descriptor.getExternalResources());
       } else {
           throw new IllegalArgumentException("Invalid descriptor. Both CPU 
cores and task heap memory must be specified with positive values.");
       }
   
   ```



##########
flink-core-api/src/main/java/org/apache/flink/configuration/MemorySize.java:
##########
@@ -260,10 +264,14 @@ public static MemorySize parse(String text, MemoryUnit 
defaultUnit)
      * @throws IllegalArgumentException Thrown, if the expression cannot be 
parsed.
      */
     public static long parseBytes(String text) throws IllegalArgumentException 
{
-        checkNotNull(text, "text");
+        if (text == null) {
+            throw new NullPointerException("text is null");
+        }

Review Comment:
   `Objects.requireNonNull ` ?



##########
flink-core-api/src/main/java/org/apache/flink/api/common/operators/SlotSharingGroupDescriptor.java:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.operators;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.configuration.MemorySize;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * The descriptor that describe the name and the different resource components 
of a slot sharing
+ * group.
+ */
+@Experimental
+public class SlotSharingGroupDescriptor {
+    private final String name;
+
+    /** How many cpu cores are needed. Can be null only if it is unknown. */
+    @Nullable // can be null only for UNKNOWN
+    private final Double cpuCores;
+
+    /** How much task heap memory is needed. */
+    @Nullable // can be null only for UNKNOWN
+    private final MemorySize taskHeapMemory;
+
+    /** How much task off-heap memory is needed. */
+    @Nullable // can be null only for UNKNOWN
+    private final MemorySize taskOffHeapMemory;
+
+    /** How much managed memory is needed. */
+    @Nullable // can be null only for UNKNOWN
+    private final MemorySize managedMemory;
+
+    /** A extensible field for user specified resources from {@link 
SlotSharingGroupDescriptor}. */
+    private final Map<String, Double> externalResources = new HashMap<>();
+
+    private SlotSharingGroupDescriptor(
+            String name,
+            @Nullable Double cpuCores,
+            @Nullable MemorySize taskHeapMemory,
+            @Nullable MemorySize taskOffHeapMemory,
+            @Nullable MemorySize managedMemory,
+            @Nullable Map<String, Double> extendedResources) {
+        this.name = name;
+        this.cpuCores = cpuCores;
+        this.taskHeapMemory = taskHeapMemory;
+        this.taskOffHeapMemory = taskOffHeapMemory;
+        this.managedMemory = managedMemory;
+        this.externalResources.putAll(extendedResources);

Review Comment:
   This might throw `NullPointerException` when `extendedResources` is `null`



##########
flink-core-api/src/main/java/org/apache/flink/configuration/MemorySize.java:
##########
@@ -382,10 +390,15 @@ public static String getAllUnits() {
         }
 
         public static boolean hasUnit(String text) {
-            checkNotNull(text, "text");
+            if (text == null) {
+                throw new NullPointerException("text is null");
+            }

Review Comment:
   `Objects.requireNonNull ` ?
   
   Also, maybe instead of "text is null", we can use "text cannot be null" ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to