This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7871a580f4972762cbd85c8588ae9fa46a816574
Author: Yangze Guo <[email protected]>
AuthorDate: Wed Jun 30 18:37:29 2021 +0800

    [FLINK-21925][core] Introduce SlotSharingGroupUtils which contains utility 
for SlotSharingGroup
---
 .../flink/api/common/operators/ResourceSpec.java   |  4 ++
 .../operators/util/SlotSharingGroupUtils.java      | 54 +++++++++++++++++++++
 .../operators/util/SlotSharingGroupUtilsTest.java  | 56 ++++++++++++++++++++++
 3 files changed, 114 insertions(+)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
index 6b24b66..9990edd 100755
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
@@ -332,6 +332,10 @@ public final class ResourceSpec implements Serializable {
         return new Builder(new CPUResource(cpuCores), 
MemorySize.ofMebiBytes(taskHeapMemoryMB));
     }
 
+    public static Builder newBuilder(double cpuCores, MemorySize 
taskHeapMemory) {
+        return new Builder(new CPUResource(cpuCores), taskHeapMemory);
+    }
+
     /** Builder for the {@link ResourceSpec}. */
     public static class Builder {
 
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/SlotSharingGroupUtils.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/SlotSharingGroupUtils.java
new file mode 100644
index 0000000..83c9f96
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/SlotSharingGroupUtils.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.operators.util;
+
+import org.apache.flink.api.common.operators.ResourceSpec;
+import org.apache.flink.api.common.operators.SlotSharingGroup;
+import org.apache.flink.api.common.resources.ExternalResource;
+import org.apache.flink.util.Preconditions;
+
+import java.util.stream.Collectors;
+
+/** Utils for {@link SlotSharingGroup}. */
+public class SlotSharingGroupUtils {
+    public static ResourceSpec extractResourceSpec(SlotSharingGroup 
slotSharingGroup) {
+        if (!slotSharingGroup.getCpuCores().isPresent()) {
+            return ResourceSpec.UNKNOWN;
+        }
+
+        Preconditions.checkState(slotSharingGroup.getCpuCores().isPresent());
+        
Preconditions.checkState(slotSharingGroup.getTaskHeapMemory().isPresent());
+        
Preconditions.checkState(slotSharingGroup.getTaskOffHeapMemory().isPresent());
+        
Preconditions.checkState(slotSharingGroup.getManagedMemory().isPresent());
+
+        return ResourceSpec.newBuilder(
+                        slotSharingGroup.getCpuCores().get(),
+                        slotSharingGroup.getTaskHeapMemory().get())
+                
.setTaskOffHeapMemory(slotSharingGroup.getTaskOffHeapMemory().get())
+                .setManagedMemory(slotSharingGroup.getManagedMemory().get())
+                .setExtendedResources(
+                        
slotSharingGroup.getExternalResources().entrySet().stream()
+                                .map(
+                                        entry ->
+                                                new ExternalResource(
+                                                        entry.getKey(), 
entry.getValue()))
+                                .collect(Collectors.toList()))
+                .build();
+    }
+}
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/operators/util/SlotSharingGroupUtilsTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/operators/util/SlotSharingGroupUtilsTest.java
new file mode 100644
index 0000000..de72037
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/operators/util/SlotSharingGroupUtilsTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.operators.util;
+
+import org.apache.flink.api.common.operators.ResourceSpec;
+import org.apache.flink.api.common.operators.SlotSharingGroup;
+import org.apache.flink.api.common.resources.ExternalResource;
+
+import org.junit.Test;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+/** Tests for {@link SlotSharingGroupUtils}. */
+public class SlotSharingGroupUtilsTest {
+    @Test
+    public void testCovertToResourceSpec() {
+        final ExternalResource gpu = new ExternalResource("gpu", 1);
+        final ResourceSpec resourceSpec =
+                ResourceSpec.newBuilder(1.0, 100)
+                        .setManagedMemoryMB(200)
+                        .setTaskOffHeapMemoryMB(300)
+                        .setExtendedResource(gpu)
+                        .build();
+        final SlotSharingGroup slotSharingGroup1 =
+                SlotSharingGroup.newBuilder("ssg")
+                        
.setCpuCores(resourceSpec.getCpuCores().getValue().doubleValue())
+                        .setTaskHeapMemory(resourceSpec.getTaskHeapMemory())
+                        
.setTaskOffHeapMemory(resourceSpec.getTaskOffHeapMemory())
+                        .setManagedMemory(resourceSpec.getManagedMemory())
+                        .setExternalResource(gpu.getName(), 
gpu.getValue().doubleValue())
+                        .build();
+        final SlotSharingGroup slotSharingGroup2 = 
SlotSharingGroup.newBuilder("ssg").build();
+
+        
assertThat(SlotSharingGroupUtils.extractResourceSpec(slotSharingGroup1), 
is(resourceSpec));
+        assertThat(
+                SlotSharingGroupUtils.extractResourceSpec(slotSharingGroup2),
+                is(ResourceSpec.UNKNOWN));
+    }
+}

Reply via email to