This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 7871a580f4972762cbd85c8588ae9fa46a816574 Author: Yangze Guo <[email protected]> AuthorDate: Wed Jun 30 18:37:29 2021 +0800 [FLINK-21925][core] Introduce SlotSharingGroupUtils which contains utility for SlotSharingGroup --- .../flink/api/common/operators/ResourceSpec.java | 4 ++ .../operators/util/SlotSharingGroupUtils.java | 54 +++++++++++++++++++++ .../operators/util/SlotSharingGroupUtilsTest.java | 56 ++++++++++++++++++++++ 3 files changed, 114 insertions(+) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java index 6b24b66..9990edd 100755 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java @@ -332,6 +332,10 @@ public final class ResourceSpec implements Serializable { return new Builder(new CPUResource(cpuCores), MemorySize.ofMebiBytes(taskHeapMemoryMB)); } + public static Builder newBuilder(double cpuCores, MemorySize taskHeapMemory) { + return new Builder(new CPUResource(cpuCores), taskHeapMemory); + } + /** Builder for the {@link ResourceSpec}. */ public static class Builder { diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/SlotSharingGroupUtils.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/SlotSharingGroupUtils.java new file mode 100644 index 0000000..83c9f96 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/SlotSharingGroupUtils.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.operators.util; + +import org.apache.flink.api.common.operators.ResourceSpec; +import org.apache.flink.api.common.operators.SlotSharingGroup; +import org.apache.flink.api.common.resources.ExternalResource; +import org.apache.flink.util.Preconditions; + +import java.util.stream.Collectors; + +/** Utils for {@link SlotSharingGroup}. */ +public class SlotSharingGroupUtils { + public static ResourceSpec extractResourceSpec(SlotSharingGroup slotSharingGroup) { + if (!slotSharingGroup.getCpuCores().isPresent()) { + return ResourceSpec.UNKNOWN; + } + + Preconditions.checkState(slotSharingGroup.getCpuCores().isPresent()); + Preconditions.checkState(slotSharingGroup.getTaskHeapMemory().isPresent()); + Preconditions.checkState(slotSharingGroup.getTaskOffHeapMemory().isPresent()); + Preconditions.checkState(slotSharingGroup.getManagedMemory().isPresent()); + + return ResourceSpec.newBuilder( + slotSharingGroup.getCpuCores().get(), + slotSharingGroup.getTaskHeapMemory().get()) + .setTaskOffHeapMemory(slotSharingGroup.getTaskOffHeapMemory().get()) + .setManagedMemory(slotSharingGroup.getManagedMemory().get()) + .setExtendedResources( + slotSharingGroup.getExternalResources().entrySet().stream() + .map( + entry -> + new ExternalResource( + entry.getKey(), entry.getValue())) + .collect(Collectors.toList())) + .build(); + } +} diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/util/SlotSharingGroupUtilsTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/util/SlotSharingGroupUtilsTest.java new file mode 100644 index 0000000..de72037 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/util/SlotSharingGroupUtilsTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.operators.util; + +import org.apache.flink.api.common.operators.ResourceSpec; +import org.apache.flink.api.common.operators.SlotSharingGroup; +import org.apache.flink.api.common.resources.ExternalResource; + +import org.junit.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +/** Tests for {@link SlotSharingGroupUtils}. */ +public class SlotSharingGroupUtilsTest { + @Test + public void testCovertToResourceSpec() { + final ExternalResource gpu = new ExternalResource("gpu", 1); + final ResourceSpec resourceSpec = + ResourceSpec.newBuilder(1.0, 100) + .setManagedMemoryMB(200) + .setTaskOffHeapMemoryMB(300) + .setExtendedResource(gpu) + .build(); + final SlotSharingGroup slotSharingGroup1 = + SlotSharingGroup.newBuilder("ssg") + .setCpuCores(resourceSpec.getCpuCores().getValue().doubleValue()) + .setTaskHeapMemory(resourceSpec.getTaskHeapMemory()) + .setTaskOffHeapMemory(resourceSpec.getTaskOffHeapMemory()) + .setManagedMemory(resourceSpec.getManagedMemory()) + .setExternalResource(gpu.getName(), gpu.getValue().doubleValue()) + .build(); + final SlotSharingGroup slotSharingGroup2 = SlotSharingGroup.newBuilder("ssg").build(); + + assertThat(SlotSharingGroupUtils.extractResourceSpec(slotSharingGroup1), is(resourceSpec)); + assertThat( + SlotSharingGroupUtils.extractResourceSpec(slotSharingGroup2), + is(ResourceSpec.UNKNOWN)); + } +}
