This is an automated email from the ASF dual-hosted git repository.
rmetzger pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 95f8b61 [FLINK-21099] Introduce JobType to distinguish between batch
and streaming jobs
95f8b61 is described below
commit 95f8b61b60d57e2ddb7d4f43fa7e96cb95348d75
Author: Robert Metzger <[email protected]>
AuthorDate: Tue Jan 26 13:06:57 2021 +0100
[FLINK-21099] Introduce JobType to distinguish between batch and streaming
jobs
---
.../optimizer/plantranslate/JobGraphGenerator.java | 2 ++
.../plantranslate/JobGraphGeneratorTest.java | 12 +++++++++
.../apache/flink/runtime/jobgraph/JobGraph.java | 10 ++++++++
.../org/apache/flink/runtime/jobgraph/JobType.java | 30 ++++++++++++++++++++++
.../flink/streaming/api/graph/StreamGraph.java | 10 ++++++++
.../streaming/api/graph/StreamGraphGenerator.java | 2 ++
.../api/graph/StreamingJobGraphGenerator.java | 1 +
.../StreamGraphGeneratorBatchExecutionTest.java | 19 ++++++++++++++
.../api/graph/StreamingJobGraphGeneratorTest.java | 18 +++++++++++++
.../flink/table/planner/utils/ExecutorUtils.java | 5 ++++
.../flink/test/runtime/BlockingShuffleITCase.java | 6 +++++
11 files changed, 115 insertions(+)
diff --git
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
index 4da3354..0d7fbda 100644
---
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
+++
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
@@ -67,6 +67,7 @@ import
org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphUtils;
+import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
@@ -246,6 +247,7 @@ public class JobGraphGenerator implements Visitor<PlanNode>
{
// create the job graph object
JobGraph graph = new JobGraph(jobId, program.getJobName());
+ graph.setJobType(JobType.BATCH);
try {
graph.setExecutionConfig(program.getOriginalPlan().getExecutionConfig());
} catch (IOException e) {
diff --git
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/plantranslate/JobGraphGeneratorTest.java
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/plantranslate/JobGraphGeneratorTest.java
index 8b15371..22d0ef8 100644
---
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/plantranslate/JobGraphGeneratorTest.java
+++
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/plantranslate/JobGraphGeneratorTest.java
@@ -41,6 +41,7 @@ import
org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphUtils;
+import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.util.AbstractID;
@@ -58,9 +59,11 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
+import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
public class JobGraphGeneratorTest {
@@ -293,6 +296,15 @@ public class JobGraphGeneratorTest {
}
@Test
+ public void testGeneratedJobsAreBatchJobType() {
+ ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
+ env.fromElements("test").output(new DiscardingOutputFormat<>());
+
+ JobGraph graph = compileJob(env);
+ assertThat(graph.getJobType(), is(JobType.BATCH));
+ }
+
+ @Test
public void testGeneratingJobGraphWithUnconsumedResultPartition() {
ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index 3a9ce0e..40dfe66 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -83,6 +83,8 @@ public class JobGraph implements Serializable {
/** The mode in which the job is scheduled. */
private ScheduleMode scheduleMode = ScheduleMode.LAZY_FROM_SOURCES;
+ private JobType jobType = JobType.BATCH;
+
/**
* Whether approximate local recovery is enabled. This flag will be
removed together with legacy
* scheduling strategies.
@@ -239,6 +241,14 @@ public class JobGraph implements Serializable {
return scheduleMode;
}
+ public void setJobType(JobType type) {
+ this.jobType = type;
+ }
+
+ public JobType getJobType() {
+ return jobType;
+ }
+
public void enableApproximateLocalRecovery(boolean enabled) {
this.approximateLocalRecovery = enabled;
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobType.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobType.java
new file mode 100644
index 0000000..c861ef0
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobType.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobgraph;
+
+/** Enum to distinguish JobGraphs between batch and streaming, currently used
by the scheduler. */
+public enum JobType {
+ /** Batch jobs are finite jobs, potentially consisting of multiple
pipelined regions. */
+ BATCH,
+ /**
+ * Streaming jobs are infinite jobs, consisting of one large pipelined
region, not separated by
+ * any blocking data exchanges.
+ */
+ STREAMING
+}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index dfcfe35..095543c 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -34,6 +34,7 @@ import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.MissingTypeInfo;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -117,6 +118,7 @@ public class StreamGraph implements Pipeline {
private CheckpointStorage checkpointStorage;
private Set<Tuple2<StreamNode, StreamNode>> iterationSourceSinkPairs;
private InternalTimeServiceManager.Provider timerServiceProvider;
+ private JobType jobType = JobType.STREAMING;
public StreamGraph(
ExecutionConfig executionConfig,
@@ -931,4 +933,12 @@ public class StreamGraph implements Pipeline {
? typeInfo.createSerializer(executionConfig)
: null;
}
+
+ public void setJobType(JobType jobType) {
+ this.jobType = jobType;
+ }
+
+ public JobType getJobType() {
+ return jobType;
+ }
}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index 59e76fc..7192437 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.state.CheckpointStorage;
@@ -289,6 +290,7 @@ public class StreamGraphGenerator {
graph.setUserArtifacts(userArtifacts);
graph.setTimeCharacteristic(timeCharacteristic);
graph.setJobName(jobName);
+ graph.setJobType(shouldExecuteInBatchMode ? JobType.BATCH :
JobType.STREAMING);
if (shouldExecuteInBatchMode) {
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index b83f295..50e4c1b 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -159,6 +159,7 @@ public class StreamingJobGraphGenerator {
private JobGraph createJobGraph() {
preValidate();
+ jobGraph.setJobType(streamGraph.getJobType());
// make sure that all vertices start immediately
jobGraph.setScheduleMode(streamGraph.getScheduleMode());
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorBatchExecutionTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorBatchExecutionTest.java
index 1089283..81e750b 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorBatchExecutionTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorBatchExecutionTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.connector.source.mocks.MockSource;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.MultipleConnectedStreams;
@@ -68,6 +69,7 @@ import java.util.List;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.junit.Assert.assertThat;
@@ -81,6 +83,23 @@ public class StreamGraphGeneratorBatchExecutionTest extends
TestLogger {
@Rule public ExpectedException expectedException =
ExpectedException.none();
@Test
+ public void testBatchJobType() {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ SingleOutputStreamOperator<Integer> process =
+ env.fromElements(1,
2).keyBy(Integer::intValue).process(DUMMY_PROCESS_FUNCTION);
+ DataStreamSink<Integer> sink = process.addSink(new DiscardingSink<>());
+ StreamGraphGenerator graphGenerator =
+ new StreamGraphGenerator(
+ Collections.singletonList(sink.getTransformation()),
+ env.getConfig(),
+ env.getCheckpointConfig());
+ graphGenerator.setRuntimeExecutionMode(RuntimeExecutionMode.BATCH);
+
+ StreamGraph graph = graphGenerator.generate();
+ assertThat(graph.getJobType(), is(JobType.BATCH));
+ }
+
+ @Test
public void testOneInputTransformation() {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
SingleOutputStreamOperator<Integer> process =
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index f696bea..43c1426 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -45,6 +45,7 @@ import
org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.InputOutputFormatContainer;
import org.apache.flink.runtime.jobgraph.InputOutputFormatVertex;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
@@ -716,6 +717,23 @@ public class StreamingJobGraphGeneratorTest extends
TestLogger {
}
@Test
+ public void testStreamingJobTypeByDefault() {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.fromElements("test").addSink(new DiscardingSink<>());
+ JobGraph jobGraph =
StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
+ assertEquals(JobType.STREAMING, jobGraph.getJobType());
+ }
+
+ @Test
+ public void testBatchJobType() {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+ env.fromElements("test").addSink(new DiscardingSink<>());
+ JobGraph jobGraph =
StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
+ assertEquals(JobType.BATCH, jobGraph.getJobType());
+ }
+
+ @Test
public void testPartitionTypesInBatchMode() {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
diff --git
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/ExecutorUtils.java
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/ExecutorUtils.java
index 02bd028..78db759 100644
---
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/ExecutorUtils.java
+++
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/ExecutorUtils.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.planner.utils;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.GlobalDataExchangeMode;
@@ -67,6 +68,10 @@ public class ExecutorUtils {
.forEach(sn -> sn.setResources(ResourceSpec.UNKNOWN,
ResourceSpec.UNKNOWN));
streamGraph.setChaining(true);
streamGraph.setAllVerticesInSameSlotSharingGroupByDefault(false);
+ // Configure job type for properly selecting a supported scheduler for
batch jobs.
+ // LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST is only supported by the
Batch scheduler (=Ng
+ // scheduler)
+ streamGraph.setJobType(JobType.BATCH);
streamGraph.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST);
streamGraph.setStateBackend(null);
streamGraph.setCheckpointStorage(null);
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java
index 829dde6..b3de8e8 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -94,6 +95,11 @@ public class BlockingShuffleITCase {
StreamGraph streamGraph = env.getStreamGraph();
streamGraph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_BLOCKING);
streamGraph.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES);
+ // a scheduler supporting batch jobs is required for this job graph,
because it contains
+ // blocking data exchanges, and the "lazy from sources" schedule mode
is not supported by
+ // the declarative scheduler.
+ // The scheduler is selected based on the JobType.
+ streamGraph.setJobType(JobType.BATCH);
return StreamingJobGraphGenerator.createJobGraph(streamGraph);
}