This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3f36bbf79ce9b34228f20f2bba50c85d1b6a29e3 Author: Yangze Guo <[email protected]> AuthorDate: Fri Jun 25 15:50:06 2021 +0800 [FLINK-21925][core] Introduce StreamExecutionEnvironment#registerSlotSharingGroup --- ...st_stream_execution_environment_completeness.py | 3 +- .../environment/StreamExecutionEnvironment.java | 37 +++++++++++++++++- .../api/StreamExecutionEnvironmentTest.java | 44 ++++++++++++++++++++++ .../api/scala/StreamExecutionEnvironment.scala | 17 +++++++++ 4 files changed, 99 insertions(+), 2 deletions(-) diff --git a/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py b/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py index 878d042..5f6a1a8 100644 --- a/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py +++ b/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py @@ -39,6 +39,7 @@ class StreamExecutionEnvironmentCompletenessTests(PythonAPICompletenessTestCase, # Currently only the methods for configuration is added. # 'isForceCheckpointing', 'getNumberOfExecutionRetries', 'setNumberOfExecutionRetries' # is deprecated, exclude them. + # TODO the registerSlotSharingGroup should be removed from this list after FLINK-23165. return {'getLastJobExecutionResult', 'getId', 'getIdString', 'registerCachedFile', 'createCollectionsEnvironment', 'createLocalEnvironment', 'createRemoteEnvironment', 'addOperator', 'fromElements', @@ -48,7 +49,7 @@ class StreamExecutionEnvironmentCompletenessTests(PythonAPICompletenessTestCase, 'createInput', 'createLocalEnvironmentWithWebUI', 'fromCollection', 'socketTextStream', 'initializeContextEnvironment', 'readTextFile', 'setNumberOfExecutionRetries', 'configure', 'executeAsync', 'registerJobListener', - 'clearJobListeners', 'getJobListeners', "fromSequence"} + 'clearJobListeners', 'getJobListeners', "fromSequence", "registerSlotSharingGroup"} if __name__ == '__main__': diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 77efe15..ba5d107 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -31,6 +31,9 @@ import org.apache.flink.api.common.functions.InvalidTypesException; import org.apache.flink.api.common.io.FileInputFormat; import org.apache.flink.api.common.io.FilePathFilter; import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.common.operators.ResourceSpec; +import org.apache.flink.api.common.operators.SlotSharingGroup; +import org.apache.flink.api.common.operators.util.SlotSharingGroupUtils; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; @@ -53,6 +56,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.configuration.ExecutionOptions; +import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.configuration.RestOptions; @@ -64,6 +68,7 @@ import org.apache.flink.core.execution.PipelineExecutor; import org.apache.flink.core.execution.PipelineExecutorFactory; import org.apache.flink.core.execution.PipelineExecutorServiceLoader; import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.StateBackendLoader; @@ -110,8 +115,10 @@ import java.net.URI; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -189,6 +196,9 @@ public class StreamExecutionEnvironment { private final List<JobListener> jobListeners = new ArrayList<>(); + // Records the slot sharing groups and their corresponding fine-grained ResourceProfile + private final Map<String, ResourceProfile> slotSharingGroupResources = new HashMap<>(); + // -------------------------------------------------------------------------------------------- // Constructor and Properties // -------------------------------------------------------------------------------------------- @@ -337,6 +347,30 @@ public class StreamExecutionEnvironment { } /** + * Register a slot sharing group with its resource spec. + * + * <p>Note that a slot sharing group hints the scheduler that the grouped operators CAN be + * deployed into a shared slot. There's no guarantee that the scheduler always deploy the + * grouped operators together. In cases grouped operators are deployed into separate slots, the + * slot resources will be derived from the specified group requirements. + * + * @param slotSharingGroup which contains name and its resource spec. + */ + @PublicEvolving + public StreamExecutionEnvironment registerSlotSharingGroup(SlotSharingGroup slotSharingGroup) { + final ResourceSpec resourceSpec = + SlotSharingGroupUtils.extractResourceSpec(slotSharingGroup); + if (!resourceSpec.equals(ResourceSpec.UNKNOWN)) { + this.slotSharingGroupResources.put( + slotSharingGroup.getName(), + ResourceProfile.fromResourceSpec( + SlotSharingGroupUtils.extractResourceSpec(slotSharingGroup), + MemorySize.ZERO)); + } + return this; + } + + /** * Gets the parallelism with which operation are executed by default. Operations can * individually override this value to use a specific parallelism. * @@ -2087,7 +2121,8 @@ public class StreamExecutionEnvironment { .setChaining(isChainingEnabled) .setUserArtifacts(cacheFile) .setTimeCharacteristic(timeCharacteristic) - .setDefaultBufferTimeout(bufferTimeout); + .setDefaultBufferTimeout(bufferTimeout) + .setSlotSharingGroupResource(slotSharingGroupResources); } /** diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java index f86dad2..683e31f 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.api; import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.operators.SlotSharingGroup; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; @@ -25,6 +26,7 @@ import org.apache.flink.api.java.typeutils.GenericTypeInfo; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.PipelineOptions; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; @@ -48,7 +50,10 @@ import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -275,6 +280,45 @@ public class StreamExecutionEnvironmentTest { } @Test + public void testRegisterSlotSharingGroup() { + final SlotSharingGroup ssg1 = + SlotSharingGroup.newBuilder("ssg1").setCpuCores(1).setTaskHeapMemoryMB(100).build(); + final SlotSharingGroup ssg2 = + SlotSharingGroup.newBuilder("ssg2").setCpuCores(2).setTaskHeapMemoryMB(200).build(); + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.registerSlotSharingGroup(ssg1); + env.registerSlotSharingGroup(ssg2); + env.registerSlotSharingGroup(SlotSharingGroup.newBuilder("ssg3").build()); + + final DataStream<Integer> source = env.fromElements(1).slotSharingGroup("ssg1"); + source.map(value -> value).slotSharingGroup(ssg2).addSink(new DiscardingSink<>()); + + final StreamGraph streamGraph = env.getStreamGraph(); + assertThat( + streamGraph.getSlotSharingGroupResource("ssg1").get(), + is(ResourceProfile.fromResources(1, 100))); + assertThat( + streamGraph.getSlotSharingGroupResource("ssg2").get(), + is(ResourceProfile.fromResources(2, 200))); + assertFalse(streamGraph.getSlotSharingGroupResource("ssg3").isPresent()); + } + + @Test(expected = IllegalArgumentException.class) + public void testRegisterSlotSharingGroupConflict() { + final SlotSharingGroup ssg = + SlotSharingGroup.newBuilder("ssg1").setCpuCores(1).setTaskHeapMemoryMB(100).build(); + final SlotSharingGroup ssgConflict = + SlotSharingGroup.newBuilder("ssg1").setCpuCores(2).setTaskHeapMemoryMB(200).build(); + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.registerSlotSharingGroup(ssg); + + final DataStream<Integer> source = env.fromElements(1).slotSharingGroup("ssg1"); + source.map(value -> value).slotSharingGroup(ssgConflict).addSink(new DiscardingSink<>()); + + env.getStreamGraph(); + } + + @Test public void testGetStreamGraph() { try { TypeInformation<Integer> typeInfo = BasicTypeInfo.INT_TYPE_INFO; diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala index 3ee460c..d0551c3 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala @@ -24,6 +24,7 @@ import org.apache.flink.annotation.{Experimental, Internal, Public, PublicEvolvi import org.apache.flink.api.common.RuntimeExecutionMode import org.apache.flink.api.common.eventtime.WatermarkStrategy import org.apache.flink.api.common.io.{FileInputFormat, FilePathFilter, InputFormat} +import org.apache.flink.api.common.operators.SlotSharingGroup import org.apache.flink.api.common.restartstrategy.RestartStrategies.RestartStrategyConfiguration import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.connector.source.{Source, SourceSplit} @@ -107,6 +108,22 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { } /** + * Register a slot sharing group with its resource spec. + * + * <p>Note that a slot sharing group hints the scheduler that the grouped operators CAN be + * deployed into a shared slot. There's no guarantee that the scheduler always deploy the + * grouped operators together. In cases grouped operators are deployed into separate slots, the + * slot resources will be derived from the specified group requirements. + * + * @param slotSharingGroup which contains name and its resource spec. + */ + @PublicEvolving + def registerSlotSharingGroup(slotSharingGroup: SlotSharingGroup): StreamExecutionEnvironment = { + javaEnv.registerSlotSharingGroup(slotSharingGroup) + this + } + + /** * Returns the default parallelism for this execution environment. Note that this * value can be overridden by individual operations using [[DataStream#setParallelism(int)]] */
