aljoscha commented on a change in pull request #13502: URL: https://github.com/apache/flink/pull/13502#discussion_r496566965
########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/RuntimeExecutionMode.java ########## @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.graph; + +import org.apache.flink.annotation.Internal; + +/** + * The runtime execution mode based on which Flink will pick + * how to schedule the tasks of a given job and what will be the default semantics. + * + * @see <a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-134%3A+Batch+execution+for+the+DataStream+API"> + * https://cwiki.apache.org/confluence/display/FLINK/FLIP-134%3A+Batch+execution+for+the+DataStream+API</a> + */ +@Internal +public enum RuntimeExecutionMode { + STREAMING, Review comment: Isn't the indentation wrong here? Should be a tab according to normal Flink style. ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java ########## @@ -182,24 +180,14 @@ public StreamGraphGenerator setJobName(String jobName) { return this; } - public StreamGraphGenerator setGlobalDataExchangeMode(GlobalDataExchangeMode globalDataExchangeMode) { - this.globalDataExchangeMode = globalDataExchangeMode; - return this; - } - public void setSavepointRestoreSettings(SavepointRestoreSettings savepointRestoreSettings) { this.savepointRestoreSettings = savepointRestoreSettings; } public StreamGraph generate() { + runtimeExecutionMode = determineExecutionMode(runtimeExecutionMode); Review comment: I think we should have a separate field and not modify the original one because that can make it hard to debug things. We could even just have a local `boolean` field that says `isBounded` because those are the only two options we have. There can be no `AUTOMATIC` from this point on. We we then also not need to have the various `verifyRuntimeModeIsSet()` checks later. WDYT? ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/WithBoundedness.java ########## @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.transformations; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.connector.source.Boundedness; + +/** + * An interface to be implemented by transformations that have explicitly set {@link Boundedness}. + * For now this is the {@link LegacySourceTransformation} and the {@link SourceTransformation}. + */ Review comment: This should be more concise, we don't need to mention the implementors here in the Javadoc. > {@link WithBoundedness}} allows {@link Transformation Transformations} to expose their {@link Boundedness}. However, I think that if we need boundedness information on all `Transformations` in the future we might as well add the method to `Transformation` instead of adding an extra interface that we will eventually delete again. ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java ########## @@ -216,13 +204,74 @@ public StreamGraph generate() { return builtStreamGraph; } + @VisibleForTesting + RuntimeExecutionMode determineExecutionMode(final RuntimeExecutionMode chosenMode) { + if (checkNotNull(chosenMode) != RuntimeExecutionMode.AUTOMATIC) { + return chosenMode; + } + + final boolean continuousSourceExists = transformations + .stream() + .anyMatch(transformation -> + isContinuousSource(transformation) || + transformation + .getTransitivePredecessors() + .stream() + .anyMatch(this::isContinuousSource)); + + return continuousSourceExists + ? RuntimeExecutionMode.STREAMING + : RuntimeExecutionMode.BATCH; + } + + private boolean isContinuousSource(final Transformation<?> transformation) { + checkNotNull(transformation); + return transformation instanceof WithBoundedness && + ((WithBoundedness) transformation).getBoundedness() != Boundedness.BOUNDED; + } + + private void configureStreamGraph( + final StreamGraph graph, + final RuntimeExecutionMode runtimeExecutionMode) { + checkNotNull(graph); + checkNotNull(runtimeExecutionMode); + + // by now we must have disambiguated the runtime-mode in which to execute the pipeline. + verifyRuntimeModeIsSet(); + + graph.setStateBackend(stateBackend); + graph.setChaining(chaining); + graph.setUserArtifacts(userArtifacts); + graph.setTimeCharacteristic(timeCharacteristic); + graph.setJobName(jobName); + + switch (runtimeExecutionMode) { + case STREAMING: + graph.setAllVerticesInSameSlotSharingGroupByDefault(true); + graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED); + graph.setScheduleMode(ScheduleMode.EAGER); + break; + case BATCH: + // TODO: 24.09.20 copied from ExecutorUtils.setBatchProperties(StreamGraph...). Ask if correct!!! Review comment: @dawidwys should know more. ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java ########## @@ -216,13 +204,74 @@ public StreamGraph generate() { return builtStreamGraph; } + @VisibleForTesting + RuntimeExecutionMode determineExecutionMode(final RuntimeExecutionMode chosenMode) { + if (checkNotNull(chosenMode) != RuntimeExecutionMode.AUTOMATIC) { + return chosenMode; + } + + final boolean continuousSourceExists = transformations + .stream() + .anyMatch(transformation -> + isContinuousSource(transformation) || + transformation + .getTransitivePredecessors() + .stream() + .anyMatch(this::isContinuousSource)); + + return continuousSourceExists + ? RuntimeExecutionMode.STREAMING + : RuntimeExecutionMode.BATCH; + } + + private boolean isContinuousSource(final Transformation<?> transformation) { Review comment: We try and keep the terminology consistent. Maybe this should be `isUnboundedSource()`? ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java ########## @@ -1061,9 +1064,11 @@ private StateBackend loadStateBackend(ReadableConfig configuration, ClassLoader } // private helper for passing different names - private <OUT> DataStreamSource<OUT> fromParallelCollection(SplittableIterator<OUT> iterator, TypeInformation<OUT> - typeInfo, String operatorName) { - return addSource(new FromSplittableIteratorFunction<>(iterator), operatorName, typeInfo); + private <OUT> DataStreamSource<OUT> fromParallelCollection( + SplittableIterator<OUT> iterator, TypeInformation<OUT> Review comment: nit, weird formatting: the `TypeInformation...` parameter should be on a newline ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/RuntimeExecutionMode.java ########## @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.graph; + +import org.apache.flink.annotation.Internal; + +/** + * The runtime execution mode based on which Flink will pick + * how to schedule the tasks of a given job and what will be the default semantics. + * Review comment: I think we can be more concise: > Runtime execution mode of DataStream programs. Among other things, this controls task scheduling, network shuffle behavior, and time semantics. Some operations will also change their record emission behaviour based on the configured execution mode. Also, I think this should be in the `o.a.f.streaming.api` package. The `graph` package is really an internal package that should be in `o.a.f.s.runtime`. (Maybe we should change that.) ########## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorRuntimeExecutionModeDetection.java ########## @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.graph; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.IntegerTypeInfo; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.mocks.MockSource; +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.streaming.api.environment.CheckpointConfig; +import org.apache.flink.streaming.api.operators.SimpleOperatorFactory; +import org.apache.flink.streaming.api.operators.SourceOperatorFactory; +import org.apache.flink.streaming.api.transformations.SourceTransformation; +import org.apache.flink.streaming.api.transformations.TwoInputTransformation; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for the {@link StreamGraphGenerator#determineExecutionMode(RuntimeExecutionMode)}. + */ +public class StreamGraphGeneratorRuntimeExecutionModeDetection extends TestLogger { Review comment: The name is a bit long and doesn't add with `Test`, as tests should. This could be added to `StreamGraphGeneratorTest`. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
