This is an automated email from the ASF dual-hosted git repository.

rmetzger pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 95f8b61  [FLINK-21099] Introduce JobType to distinguish between batch 
and streaming jobs
95f8b61 is described below

commit 95f8b61b60d57e2ddb7d4f43fa7e96cb95348d75
Author: Robert Metzger <[email protected]>
AuthorDate: Tue Jan 26 13:06:57 2021 +0100

    [FLINK-21099] Introduce JobType to distinguish between batch and streaming 
jobs
---
 .../optimizer/plantranslate/JobGraphGenerator.java |  2 ++
 .../plantranslate/JobGraphGeneratorTest.java       | 12 +++++++++
 .../apache/flink/runtime/jobgraph/JobGraph.java    | 10 ++++++++
 .../org/apache/flink/runtime/jobgraph/JobType.java | 30 ++++++++++++++++++++++
 .../flink/streaming/api/graph/StreamGraph.java     | 10 ++++++++
 .../streaming/api/graph/StreamGraphGenerator.java  |  2 ++
 .../api/graph/StreamingJobGraphGenerator.java      |  1 +
 .../StreamGraphGeneratorBatchExecutionTest.java    | 19 ++++++++++++++
 .../api/graph/StreamingJobGraphGeneratorTest.java  | 18 +++++++++++++
 .../flink/table/planner/utils/ExecutorUtils.java   |  5 ++++
 .../flink/test/runtime/BlockingShuffleITCase.java  |  6 +++++
 11 files changed, 115 insertions(+)

diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
index 4da3354..0d7fbda 100644
--- 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
@@ -67,6 +67,7 @@ import 
org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobEdge;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobGraphUtils;
+import org.apache.flink.runtime.jobgraph.JobType;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
@@ -246,6 +247,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> 
{
 
         // create the job graph object
         JobGraph graph = new JobGraph(jobId, program.getJobName());
+        graph.setJobType(JobType.BATCH);
         try {
             
graph.setExecutionConfig(program.getOriginalPlan().getExecutionConfig());
         } catch (IOException e) {
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/plantranslate/JobGraphGeneratorTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/plantranslate/JobGraphGeneratorTest.java
index 8b15371..22d0ef8 100644
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/plantranslate/JobGraphGeneratorTest.java
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/plantranslate/JobGraphGeneratorTest.java
@@ -41,6 +41,7 @@ import 
org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobGraphUtils;
+import org.apache.flink.runtime.jobgraph.JobType;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.util.AbstractID;
 
@@ -58,9 +59,11 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Map;
 
+import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 public class JobGraphGeneratorTest {
@@ -293,6 +296,15 @@ public class JobGraphGeneratorTest {
     }
 
     @Test
+    public void testGeneratedJobsAreBatchJobType() {
+        ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+        env.fromElements("test").output(new DiscardingOutputFormat<>());
+
+        JobGraph graph = compileJob(env);
+        assertThat(graph.getJobType(), is(JobType.BATCH));
+    }
+
+    @Test
     public void testGeneratingJobGraphWithUnconsumedResultPartition() {
 
         ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index 3a9ce0e..40dfe66 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -83,6 +83,8 @@ public class JobGraph implements Serializable {
     /** The mode in which the job is scheduled. */
     private ScheduleMode scheduleMode = ScheduleMode.LAZY_FROM_SOURCES;
 
+    private JobType jobType = JobType.BATCH;
+
     /**
      * Whether approximate local recovery is enabled. This flag will be 
removed together with legacy
      * scheduling strategies.
@@ -239,6 +241,14 @@ public class JobGraph implements Serializable {
         return scheduleMode;
     }
 
+    public void setJobType(JobType type) {
+        this.jobType = type;
+    }
+
+    public JobType getJobType() {
+        return jobType;
+    }
+
     public void enableApproximateLocalRecovery(boolean enabled) {
         this.approximateLocalRecovery = enabled;
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobType.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobType.java
new file mode 100644
index 0000000..c861ef0
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobType.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobgraph;
+
+/** Enum to distinguish JobGraphs between batch and streaming, currently used 
by the scheduler. */
+public enum JobType {
+    /** Batch jobs are finite jobs, potentially consisting of multiple 
pipelined regions. */
+    BATCH,
+    /**
+     * Streaming jobs are infinite jobs, consisting of one large pipelined 
region, not separated by
+     * any blocking data exchanges.
+     */
+    STREAMING
+}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index dfcfe35..095543c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -34,6 +34,7 @@ import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.MissingTypeInfo;
 import org.apache.flink.core.memory.ManagedMemoryUseCase;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobType;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -117,6 +118,7 @@ public class StreamGraph implements Pipeline {
     private CheckpointStorage checkpointStorage;
     private Set<Tuple2<StreamNode, StreamNode>> iterationSourceSinkPairs;
     private InternalTimeServiceManager.Provider timerServiceProvider;
+    private JobType jobType = JobType.STREAMING;
 
     public StreamGraph(
             ExecutionConfig executionConfig,
@@ -931,4 +933,12 @@ public class StreamGraph implements Pipeline {
                 ? typeInfo.createSerializer(executionConfig)
                 : null;
     }
+
+    public void setJobType(JobType jobType) {
+        this.jobType = jobType;
+    }
+
+    public JobType getJobType() {
+        return jobType;
+    }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index 59e76fc..7192437 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.runtime.jobgraph.JobType;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.state.CheckpointStorage;
@@ -289,6 +290,7 @@ public class StreamGraphGenerator {
         graph.setUserArtifacts(userArtifacts);
         graph.setTimeCharacteristic(timeCharacteristic);
         graph.setJobName(jobName);
+        graph.setJobType(shouldExecuteInBatchMode ? JobType.BATCH : 
JobType.STREAMING);
 
         if (shouldExecuteInBatchMode) {
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index b83f295..50e4c1b 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -159,6 +159,7 @@ public class StreamingJobGraphGenerator {
 
     private JobGraph createJobGraph() {
         preValidate();
+        jobGraph.setJobType(streamGraph.getJobType());
 
         // make sure that all vertices start immediately
         jobGraph.setScheduleMode(streamGraph.getScheduleMode());
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorBatchExecutionTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorBatchExecutionTest.java
index 1089283..81e750b 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorBatchExecutionTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorBatchExecutionTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.connector.source.mocks.MockSource;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.runtime.jobgraph.JobType;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.datastream.MultipleConnectedStreams;
@@ -68,6 +69,7 @@ import java.util.List;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.junit.Assert.assertThat;
@@ -81,6 +83,23 @@ public class StreamGraphGeneratorBatchExecutionTest extends 
TestLogger {
     @Rule public ExpectedException expectedException = 
ExpectedException.none();
 
     @Test
+    public void testBatchJobType() {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        SingleOutputStreamOperator<Integer> process =
+                env.fromElements(1, 
2).keyBy(Integer::intValue).process(DUMMY_PROCESS_FUNCTION);
+        DataStreamSink<Integer> sink = process.addSink(new DiscardingSink<>());
+        StreamGraphGenerator graphGenerator =
+                new StreamGraphGenerator(
+                        Collections.singletonList(sink.getTransformation()),
+                        env.getConfig(),
+                        env.getCheckpointConfig());
+        graphGenerator.setRuntimeExecutionMode(RuntimeExecutionMode.BATCH);
+
+        StreamGraph graph = graphGenerator.generate();
+        assertThat(graph.getJobType(), is(JobType.BATCH));
+    }
+
+    @Test
     public void testOneInputTransformation() {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         SingleOutputStreamOperator<Integer> process =
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index f696bea..43c1426 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -45,6 +45,7 @@ import 
org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.InputOutputFormatContainer;
 import org.apache.flink.runtime.jobgraph.InputOutputFormatVertex;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobType;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
@@ -716,6 +717,23 @@ public class StreamingJobGraphGeneratorTest extends 
TestLogger {
     }
 
     @Test
+    public void testStreamingJobTypeByDefault() {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.fromElements("test").addSink(new DiscardingSink<>());
+        JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
+        assertEquals(JobType.STREAMING, jobGraph.getJobType());
+    }
+
+    @Test
+    public void testBatchJobType() {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+        env.fromElements("test").addSink(new DiscardingSink<>());
+        JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
+        assertEquals(JobType.BATCH, jobGraph.getJobType());
+    }
+
+    @Test
     public void testPartitionTypesInBatchMode() {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         env.setRuntimeMode(RuntimeExecutionMode.BATCH);
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/ExecutorUtils.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/ExecutorUtils.java
index 02bd028..78db759 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/ExecutorUtils.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/ExecutorUtils.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.planner.utils;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.operators.ResourceSpec;
 import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.runtime.jobgraph.JobType;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.graph.GlobalDataExchangeMode;
@@ -67,6 +68,10 @@ public class ExecutorUtils {
                 .forEach(sn -> sn.setResources(ResourceSpec.UNKNOWN, 
ResourceSpec.UNKNOWN));
         streamGraph.setChaining(true);
         streamGraph.setAllVerticesInSameSlotSharingGroupByDefault(false);
+        // Configure job type for properly selecting a supported scheduler for 
batch jobs.
+        // LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST is only supported by the 
Batch scheduler (=Ng
+        // scheduler)
+        streamGraph.setJobType(JobType.BATCH);
         
streamGraph.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST);
         streamGraph.setStateBackend(null);
         streamGraph.setCheckpointStorage(null);
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java
index 829dde6..b3de8e8 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobType;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -94,6 +95,11 @@ public class BlockingShuffleITCase {
         StreamGraph streamGraph = env.getStreamGraph();
         
streamGraph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_BLOCKING);
         streamGraph.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES);
+        // a scheduler supporting batch jobs is required for this job graph, 
because it contains
+        // blocking data exchanges, and the "lazy from sources" schedule mode 
is not supported by
+        // the declarative scheduler.
+        // The scheduler is selected based on the JobType.
+        streamGraph.setJobType(JobType.BATCH);
         return StreamingJobGraphGenerator.createJobGraph(streamGraph);
     }
 

Reply via email to