This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new ac2f0f099 [Feature] Simplify the configuration of runtime-mode and 
parallelism-default (#1764)
ac2f0f099 is described below

commit ac2f0f09986595b487408b0757b1a6a14c51b9dd
Author: 1996fanrui <[email protected]>
AuthorDate: Sat Oct 8 00:01:39 2022 +0800

    [Feature] Simplify the configuration of runtime-mode and 
parallelism-default (#1764)
---
 .../org/apache/streampark/common/conf/ConfigConst.scala      |  2 --
 .../streampark/flink/core/FlinkStreamingInitializer.scala    | 12 ------------
 2 files changed, 14 deletions(-)

diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
index 3a500b258..6233eef31 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
@@ -114,8 +114,6 @@ object ConfigConst {
 
   val KEY_YARN_APP_QUEUE = "yarn.application.queue"
 
-  val KEY_EXECUTION_RUNTIME_MODE = "flink.execution.runtime-mode"
-
   // ---table---
   val KEY_FLINK_TABLE_PLANNER = "flink.table.planner"
 
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
index abb5301ca..16e2c9d21 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
@@ -21,16 +21,13 @@ import org.apache.streampark.common.enums.ApiType
 import org.apache.streampark.common.enums.ApiType.ApiType
 import org.apache.streampark.common.util._
 
-import org.apache.flink.api.common.RuntimeExecutionMode
 import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.configuration.CoreOptions
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.table.api.TableConfig
 
 import java.io.File
 import collection.JavaConversions._
 import collection.Map
-import util.Try
 
 private[flink] object FlinkStreamingInitializer {
 
@@ -154,15 +151,6 @@ private[flink] class FlinkStreamingInitializer(args: 
Array[String], apiType: Api
 
   def initEnvironment(): Unit = {
     localStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment
-    Try(parameter.get(KEY_FLINK_PARALLELISM()).toInt).getOrElse {
-      
Try(parameter.get(CoreOptions.DEFAULT_PARALLELISM.key()).toInt).getOrElse(CoreOptions.DEFAULT_PARALLELISM.defaultValue().toInt)
-    } match {
-      case p if p > 0 => localStreamEnv.setParallelism(p)
-      case _ => throw new IllegalArgumentException("[StreamPark] parallelism 
must be > 0. ")
-    }
-
-    val executionMode = 
Try(RuntimeExecutionMode.valueOf(parameter.get(KEY_EXECUTION_RUNTIME_MODE))).getOrElse(RuntimeExecutionMode.STREAMING)
-    localStreamEnv.setRuntimeMode(executionMode)
 
     apiType match {
       case ApiType.java if javaStreamEnvConfFunc != null => 
javaStreamEnvConfFunc.configuration(localStreamEnv.getJavaEnv, parameter)

Reply via email to