This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new ac2f0f099 [Feature] Simplify the configuration of runtime-mode and
parallelism-default (#1764)
ac2f0f099 is described below
commit ac2f0f09986595b487408b0757b1a6a14c51b9dd
Author: 1996fanrui <[email protected]>
AuthorDate: Sat Oct 8 00:01:39 2022 +0800
[Feature] Simplify the configuration of runtime-mode and
parallelism-default (#1764)
---
.../org/apache/streampark/common/conf/ConfigConst.scala | 2 --
.../streampark/flink/core/FlinkStreamingInitializer.scala | 12 ------------
2 files changed, 14 deletions(-)
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
index 3a500b258..6233eef31 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
@@ -114,8 +114,6 @@ object ConfigConst {
val KEY_YARN_APP_QUEUE = "yarn.application.queue"
- val KEY_EXECUTION_RUNTIME_MODE = "flink.execution.runtime-mode"
-
// ---table---
val KEY_FLINK_TABLE_PLANNER = "flink.table.planner"
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
index abb5301ca..16e2c9d21 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
@@ -21,16 +21,13 @@ import org.apache.streampark.common.enums.ApiType
import org.apache.streampark.common.enums.ApiType.ApiType
import org.apache.streampark.common.util._
-import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.configuration.CoreOptions
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.TableConfig
import java.io.File
import collection.JavaConversions._
import collection.Map
-import util.Try
private[flink] object FlinkStreamingInitializer {
@@ -154,15 +151,6 @@ private[flink] class FlinkStreamingInitializer(args:
Array[String], apiType: Api
def initEnvironment(): Unit = {
localStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment
- Try(parameter.get(KEY_FLINK_PARALLELISM()).toInt).getOrElse {
-
Try(parameter.get(CoreOptions.DEFAULT_PARALLELISM.key()).toInt).getOrElse(CoreOptions.DEFAULT_PARALLELISM.defaultValue().toInt)
- } match {
- case p if p > 0 => localStreamEnv.setParallelism(p)
- case _ => throw new IllegalArgumentException("[StreamPark] parallelism
must be > 0. ")
- }
-
- val executionMode =
Try(RuntimeExecutionMode.valueOf(parameter.get(KEY_EXECUTION_RUNTIME_MODE))).getOrElse(RuntimeExecutionMode.STREAMING)
- localStreamEnv.setRuntimeMode(executionMode)
apiType match {
case ApiType.java if javaStreamEnvConfFunc != null =>
javaStreamEnvConfFunc.configuration(localStreamEnv.getJavaEnv, parameter)