aljoscha commented on a change in pull request #13502:
URL: https://github.com/apache/flink/pull/13502#discussion_r496566965



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/RuntimeExecutionMode.java
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * The runtime execution mode based on which Flink will pick
+ * how to schedule the tasks of a given job and what will be the default 
semantics.
+ *
+ * @see <a 
href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-134%3A+Batch+execution+for+the+DataStream+API";>
+ *     
https://cwiki.apache.org/confluence/display/FLINK/FLIP-134%3A+Batch+execution+for+the+DataStream+API</a>
+ */
+@Internal
+public enum RuntimeExecutionMode {
+       STREAMING,

Review comment:
       Isn't the indentation wrong here? Should be a tab according to normal 
Flink style.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
##########
@@ -182,24 +180,14 @@ public StreamGraphGenerator setJobName(String jobName) {
                return this;
        }
 
-       public StreamGraphGenerator 
setGlobalDataExchangeMode(GlobalDataExchangeMode globalDataExchangeMode) {
-               this.globalDataExchangeMode = globalDataExchangeMode;
-               return this;
-       }
-
        public void setSavepointRestoreSettings(SavepointRestoreSettings 
savepointRestoreSettings) {
                this.savepointRestoreSettings = savepointRestoreSettings;
        }
 
        public StreamGraph generate() {
+               runtimeExecutionMode = 
determineExecutionMode(runtimeExecutionMode);

Review comment:
       I think we should have a separate field and not modify the original one 
because that can make it hard to debug things. We could even just have a local 
`boolean` field that says `isBounded` because those are the only two options we 
have. There can be no `AUTOMATIC` from this point on. We we then also not need 
to have the various `verifyRuntimeModeIsSet()` checks later. WDYT?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/WithBoundedness.java
##########
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.transformations;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.Boundedness;
+
+/**
+ * An interface to be implemented by transformations that have explicitly set 
{@link Boundedness}.
+ * For now this is the {@link LegacySourceTransformation} and the {@link 
SourceTransformation}.
+ */

Review comment:
       This should be more concise, we don't need to mention the implementors 
here in the Javadoc.
   > {@link WithBoundedness}} allows {@link Transformation Transformations} to 
expose their {@link Boundedness}.
   
   However, I think that if we need boundedness information on all 
`Transformations` in the future we might as well add the method to 
`Transformation` instead of adding an extra interface that we will eventually 
delete again.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
##########
@@ -216,13 +204,74 @@ public StreamGraph generate() {
                return builtStreamGraph;
        }
 
+       @VisibleForTesting
+       RuntimeExecutionMode determineExecutionMode(final RuntimeExecutionMode 
chosenMode) {
+               if (checkNotNull(chosenMode) != RuntimeExecutionMode.AUTOMATIC) 
{
+                       return chosenMode;
+               }
+
+               final boolean continuousSourceExists = transformations
+                               .stream()
+                               .anyMatch(transformation ->
+                                               
isContinuousSource(transformation) ||
+                                               transformation
+                                                               
.getTransitivePredecessors()
+                                                               .stream()
+                                                               
.anyMatch(this::isContinuousSource));
+
+               return continuousSourceExists
+                               ? RuntimeExecutionMode.STREAMING
+                               : RuntimeExecutionMode.BATCH;
+       }
+
+       private boolean isContinuousSource(final Transformation<?> 
transformation) {
+               checkNotNull(transformation);
+               return transformation instanceof WithBoundedness &&
+                               ((WithBoundedness) 
transformation).getBoundedness() != Boundedness.BOUNDED;
+       }
+
+       private void configureStreamGraph(
+                       final StreamGraph graph,
+                       final RuntimeExecutionMode runtimeExecutionMode) {
+               checkNotNull(graph);
+               checkNotNull(runtimeExecutionMode);
+
+               // by now we must have disambiguated the runtime-mode in which 
to execute the pipeline.
+               verifyRuntimeModeIsSet();
+
+               graph.setStateBackend(stateBackend);
+               graph.setChaining(chaining);
+               graph.setUserArtifacts(userArtifacts);
+               graph.setTimeCharacteristic(timeCharacteristic);
+               graph.setJobName(jobName);
+
+               switch (runtimeExecutionMode) {
+                       case STREAMING:
+                               
graph.setAllVerticesInSameSlotSharingGroupByDefault(true);
+                               
graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED);
+                               graph.setScheduleMode(ScheduleMode.EAGER);
+                               break;
+                       case BATCH:
+                               // TODO: 24.09.20 copied from 
ExecutorUtils.setBatchProperties(StreamGraph...). Ask if correct!!!

Review comment:
       @dawidwys should know more.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
##########
@@ -216,13 +204,74 @@ public StreamGraph generate() {
                return builtStreamGraph;
        }
 
+       @VisibleForTesting
+       RuntimeExecutionMode determineExecutionMode(final RuntimeExecutionMode 
chosenMode) {
+               if (checkNotNull(chosenMode) != RuntimeExecutionMode.AUTOMATIC) 
{
+                       return chosenMode;
+               }
+
+               final boolean continuousSourceExists = transformations
+                               .stream()
+                               .anyMatch(transformation ->
+                                               
isContinuousSource(transformation) ||
+                                               transformation
+                                                               
.getTransitivePredecessors()
+                                                               .stream()
+                                                               
.anyMatch(this::isContinuousSource));
+
+               return continuousSourceExists
+                               ? RuntimeExecutionMode.STREAMING
+                               : RuntimeExecutionMode.BATCH;
+       }
+
+       private boolean isContinuousSource(final Transformation<?> 
transformation) {

Review comment:
       We try and keep the terminology consistent. Maybe this should be 
`isUnboundedSource()`?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
##########
@@ -1061,9 +1064,11 @@ private StateBackend loadStateBackend(ReadableConfig 
configuration, ClassLoader
        }
 
        // private helper for passing different names
-       private <OUT> DataStreamSource<OUT> 
fromParallelCollection(SplittableIterator<OUT> iterator, TypeInformation<OUT>
-                       typeInfo, String operatorName) {
-               return addSource(new 
FromSplittableIteratorFunction<>(iterator), operatorName, typeInfo);
+       private <OUT> DataStreamSource<OUT> fromParallelCollection(
+                       SplittableIterator<OUT> iterator, TypeInformation<OUT>

Review comment:
       nit, weird formatting: the `TypeInformation...` parameter should be on a 
newline

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/RuntimeExecutionMode.java
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * The runtime execution mode based on which Flink will pick
+ * how to schedule the tasks of a given job and what will be the default 
semantics.
+ *

Review comment:
       I think we can be more concise:
   
   > Runtime execution mode of DataStream programs. Among other things, this 
controls task scheduling, network shuffle behavior, and time semantics. Some 
operations will also change their record emission behaviour based on the 
configured execution mode.
   
   
   Also, I think this should be in the `o.a.f.streaming.api` package. The 
`graph` package is really an internal package that should be in 
`o.a.f.s.runtime`. (Maybe we should change that.)

##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorRuntimeExecutionModeDetection.java
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.IntegerTypeInfo;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.mocks.MockSource;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.operators.SourceOperatorFactory;
+import org.apache.flink.streaming.api.transformations.SourceTransformation;
+import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for the {@link 
StreamGraphGenerator#determineExecutionMode(RuntimeExecutionMode)}.
+ */
+public class StreamGraphGeneratorRuntimeExecutionModeDetection extends 
TestLogger {

Review comment:
       The name is a bit long and doesn't add with `Test`, as tests should.
   
   This could be added to `StreamGraphGeneratorTest`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to