This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit bc5f36d9fb063a5af1264ae58bbea9f7c0d54f3b Author: Yangze Guo <[email protected]> AuthorDate: Fri Jul 2 15:15:50 2021 +0800 [hotfix][core] Check the cpu and heap memory to be positive when building ResourceSpec --- .../org/apache/flink/api/common/operators/ResourceSpec.java | 11 ++++++++++- .../org/apache/flink/runtime/dispatcher/DispatcherTest.java | 2 +- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java index f66f2a3..6b24b66 100755 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java @@ -26,6 +26,7 @@ import org.apache.flink.configuration.MemorySize; import javax.annotation.Nullable; import java.io.Serializable; +import java.math.BigDecimal; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -70,7 +71,13 @@ public final class ResourceSpec implements Serializable { public static final ResourceSpec DEFAULT = UNKNOWN; /** A ResourceSpec that indicates zero amount of resources. */ - public static final ResourceSpec ZERO = ResourceSpec.newBuilder(0.0, 0).build(); + public static final ResourceSpec ZERO = + new ResourceSpec( + new CPUResource(0.0), + MemorySize.ZERO, + MemorySize.ZERO, + MemorySize.ZERO, + Collections.emptyMap()); /** How many cpu cores are needed. Can be null only if it is unknown. */ @Nullable private final CPUResource cpuCores; @@ -397,6 +404,8 @@ public final class ResourceSpec implements Serializable { } public ResourceSpec build() { + checkArgument(cpuCores.getValue().compareTo(BigDecimal.ZERO) > 0); + checkArgument(taskHeapMemory.compareTo(MemorySize.ZERO) > 0); return new ResourceSpec( cpuCores, taskHeapMemory, taskOffHeapMemory, managedMemory, extendedResources); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java index 4cc63e0..3b7b210 100755 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java @@ -269,7 +269,7 @@ public class DispatcherTest extends TestLogger { */ @Test public void testJobSubmissionWithPartialResourceConfigured() throws Exception { - ResourceSpec resourceSpec = ResourceSpec.newBuilder(2.0, 0).build(); + ResourceSpec resourceSpec = ResourceSpec.newBuilder(2.0, 10).build(); final JobVertex firstVertex = new JobVertex("firstVertex"); firstVertex.setInvokableClass(NoOpInvokable.class);
