This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 82516afee [Feature] Simplify the configuration of watermark interval
(#1745)
82516afee is described below
commit 82516afee069e35d10894efef4135e2e39b2581e
Author: 1996fanrui <[email protected]>
AuthorDate: Thu Oct 6 19:35:14 2022 +0800
[Feature] Simplify the configuration of watermark interval (#1745)
---
.../main/scala/org/apache/streampark/common/conf/ConfigConst.scala | 2 --
.../src/main/resources/flink-application.conf | 4 ++--
.../org/apache/streampark/flink/core/FlinkStreamingInitializer.scala | 4 ----
.../org/apache/streampark/flink/core/FlinkStreamingInitializer.scala | 4 ----
.../org/apache/streampark/flink/core/FlinkStreamingInitializer.scala | 4 ----
.../org/apache/streampark/flink/core/FlinkStreamingInitializer.scala | 4 ----
.../org/apache/streampark/flink/core/FlinkTableInitializer.scala | 2 +-
7 files changed, 3 insertions(+), 21 deletions(-)
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
index c2f31e1cd..057a7a597 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
@@ -165,8 +165,6 @@ object ConfigConst {
val KEY_EXECUTION_RUNTIME_MODE = "flink.execution.runtime-mode"
- val KEY_FLINK_WATERMARK_INTERVAL = "flink.watermark.interval"
-
// ---watermark---
val KEY_FLINK_WATERMARK_TIME_CHARACTERISTIC =
"flink.watermark.time.characteristic"
diff --git
a/streampark-console/streampark-console-service/src/main/resources/flink-application.conf
b/streampark-console/streampark-console-service/src/main/resources/flink-application.conf
index 46e64d151..fc76f0754 100644
---
a/streampark-console/streampark-console-service/src/main/resources/flink-application.conf
+++
b/streampark-console/streampark-console-service/src/main/resources/flink-application.conf
@@ -42,14 +42,14 @@ flink:
jvm-overhead.max:
jvm-overhead.min:
managed.fraction: 0.4
+ pipeline:
+ auto-watermark-interval: 200ms
checkpoints:
enable: true
interval: 30000
mode: EXACTLY_ONCE
timeout: 300000
unaligned: true
- watermark:
- interval: 10000
# state backend
state:
# Note that the configurations of flink1.12 and later are different, and
the combined configuration should be selected reasonably
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
index c20279128..d31fc8d42 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
@@ -169,10 +169,6 @@ private[flink] class FlinkStreamingInitializer(args:
Array[String], apiType: Api
case p if p > 0 => localStreamEnv.setParallelism(p)
case _ => throw new IllegalArgumentException("[StreamPark] parallelism
must be > 0. ")
}
- val interval =
Try(parameter.get(KEY_FLINK_WATERMARK_INTERVAL).toInt).getOrElse(0)
- if (interval > 0) {
- localStreamEnv.getConfig.setAutoWatermarkInterval(interval)
- }
// Compatible with 1.12 and previous versions (TimeCharacteristic
deprecated in version 1.12)
if (classOf[TimeCharacteristic].getDeclaredAnnotation(classOf[Deprecated])
== null) {
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
index 43ec72ac8..b5daefa62 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
@@ -169,10 +169,6 @@ private[flink] class FlinkStreamingInitializer(args:
Array[String], apiType: Api
case p if p > 0 => localStreamEnv.setParallelism(p)
case _ => throw new IllegalArgumentException("[StreamPark] parallelism
must be > 0. ")
}
- val interval =
Try(parameter.get(KEY_FLINK_WATERMARK_INTERVAL).toInt).getOrElse(0)
- if (interval > 0) {
- localStreamEnv.getConfig.setAutoWatermarkInterval(interval)
- }
val executionMode =
Try(RuntimeExecutionMode.valueOf(parameter.get(KEY_EXECUTION_RUNTIME_MODE))).getOrElse(RuntimeExecutionMode.STREAMING)
localStreamEnv.setRuntimeMode(executionMode)
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
index 850787cd2..e14edd70b 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
@@ -169,10 +169,6 @@ private[flink] class FlinkStreamingInitializer(args:
Array[String], apiType: Api
case p if p > 0 => localStreamEnv.setParallelism(p)
case _ => throw new IllegalArgumentException("[StreamPark] parallelism
must be > 0. ")
}
- val interval =
Try(parameter.get(KEY_FLINK_WATERMARK_INTERVAL).toInt).getOrElse(0)
- if (interval > 0) {
- localStreamEnv.getConfig.setAutoWatermarkInterval(interval)
- }
val executionMode =
Try(RuntimeExecutionMode.valueOf(parameter.get(KEY_EXECUTION_RUNTIME_MODE))).getOrElse(RuntimeExecutionMode.STREAMING)
localStreamEnv.setRuntimeMode(executionMode)
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
index 784b48727..7c6475954 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
@@ -168,10 +168,6 @@ private[flink] class FlinkStreamingInitializer(args:
Array[String], apiType: Api
case p if p > 0 => localStreamEnv.setParallelism(p)
case _ => throw new IllegalArgumentException("[StreamPark] parallelism
must be > 0. ")
}
- val interval =
Try(parameter.get(KEY_FLINK_WATERMARK_INTERVAL).toInt).getOrElse(0)
- if (interval > 0) {
- localStreamEnv.getConfig.setAutoWatermarkInterval(interval)
- }
val executionMode =
Try(RuntimeExecutionMode.valueOf(parameter.get(KEY_EXECUTION_RUNTIME_MODE))).getOrElse(RuntimeExecutionMode.STREAMING)
localStreamEnv.setRuntimeMode(executionMode)
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
index 8e3cc7bbe..eb78a17b7 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
@@ -17,7 +17,7 @@
package org.apache.streampark.flink.core
-import org.apache.streampark.common.conf.ConfigConst.{KEY_APP_CONF,
KEY_APP_NAME, KEY_EXECUTION_RUNTIME_MODE, KEY_FLINK_APP_NAME,
KEY_FLINK_PARALLELISM, KEY_FLINK_SQL, KEY_FLINK_TABLE_CATALOG,
KEY_FLINK_TABLE_DATABASE, KEY_FLINK_TABLE_MODE, KEY_FLINK_WATERMARK_INTERVAL}
+import org.apache.streampark.common.conf.ConfigConst.{KEY_APP_CONF,
KEY_APP_NAME, KEY_EXECUTION_RUNTIME_MODE, KEY_FLINK_APP_NAME,
KEY_FLINK_PARALLELISM, KEY_FLINK_SQL, KEY_FLINK_TABLE_CATALOG,
KEY_FLINK_TABLE_DATABASE, KEY_FLINK_TABLE_MODE}
import org.apache.streampark.common.enums.{ApiType, TableMode}
import org.apache.streampark.common.enums.ApiType.ApiType
import org.apache.streampark.common.enums.TableMode.TableMode