This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 82516afee [Feature] Simplify the configuration of watermark interval 
(#1745)
82516afee is described below

commit 82516afee069e35d10894efef4135e2e39b2581e
Author: 1996fanrui <[email protected]>
AuthorDate: Thu Oct 6 19:35:14 2022 +0800

    [Feature] Simplify the configuration of watermark interval (#1745)
---
 .../main/scala/org/apache/streampark/common/conf/ConfigConst.scala    | 2 --
 .../src/main/resources/flink-application.conf                         | 4 ++--
 .../org/apache/streampark/flink/core/FlinkStreamingInitializer.scala  | 4 ----
 .../org/apache/streampark/flink/core/FlinkStreamingInitializer.scala  | 4 ----
 .../org/apache/streampark/flink/core/FlinkStreamingInitializer.scala  | 4 ----
 .../org/apache/streampark/flink/core/FlinkStreamingInitializer.scala  | 4 ----
 .../org/apache/streampark/flink/core/FlinkTableInitializer.scala      | 2 +-
 7 files changed, 3 insertions(+), 21 deletions(-)

diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
index c2f31e1cd..057a7a597 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
@@ -165,8 +165,6 @@ object ConfigConst {
 
   val KEY_EXECUTION_RUNTIME_MODE = "flink.execution.runtime-mode"
 
-  val KEY_FLINK_WATERMARK_INTERVAL = "flink.watermark.interval"
-
   // ---watermark---
   val KEY_FLINK_WATERMARK_TIME_CHARACTERISTIC = 
"flink.watermark.time.characteristic"
 
diff --git 
a/streampark-console/streampark-console-service/src/main/resources/flink-application.conf
 
b/streampark-console/streampark-console-service/src/main/resources/flink-application.conf
index 46e64d151..fc76f0754 100644
--- 
a/streampark-console/streampark-console-service/src/main/resources/flink-application.conf
+++ 
b/streampark-console/streampark-console-service/src/main/resources/flink-application.conf
@@ -42,14 +42,14 @@ flink:
         jvm-overhead.max:
         jvm-overhead.min:
         managed.fraction: 0.4
+      pipeline:
+        auto-watermark-interval: 200ms
   checkpoints:
     enable: true
     interval: 30000
     mode: EXACTLY_ONCE
     timeout: 300000
     unaligned: true
-  watermark:
-    interval: 10000
   # state backend
   state:
     # Note that the configurations of flink1.12 and later are different, and 
the combined configuration should be selected reasonably
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
index c20279128..d31fc8d42 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
@@ -169,10 +169,6 @@ private[flink] class FlinkStreamingInitializer(args: 
Array[String], apiType: Api
       case p if p > 0 => localStreamEnv.setParallelism(p)
       case _ => throw new IllegalArgumentException("[StreamPark] parallelism 
must be > 0. ")
     }
-    val interval = 
Try(parameter.get(KEY_FLINK_WATERMARK_INTERVAL).toInt).getOrElse(0)
-    if (interval > 0) {
-      localStreamEnv.getConfig.setAutoWatermarkInterval(interval)
-    }
 
     // Compatible with 1.12 and previous versions (TimeCharacteristic 
deprecated in version 1.12)
     if (classOf[TimeCharacteristic].getDeclaredAnnotation(classOf[Deprecated]) 
== null) {
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
index 43ec72ac8..b5daefa62 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
@@ -169,10 +169,6 @@ private[flink] class FlinkStreamingInitializer(args: 
Array[String], apiType: Api
       case p if p > 0 => localStreamEnv.setParallelism(p)
       case _ => throw new IllegalArgumentException("[StreamPark] parallelism 
must be > 0. ")
     }
-    val interval = 
Try(parameter.get(KEY_FLINK_WATERMARK_INTERVAL).toInt).getOrElse(0)
-    if (interval > 0) {
-      localStreamEnv.getConfig.setAutoWatermarkInterval(interval)
-    }
 
     val executionMode = 
Try(RuntimeExecutionMode.valueOf(parameter.get(KEY_EXECUTION_RUNTIME_MODE))).getOrElse(RuntimeExecutionMode.STREAMING)
     localStreamEnv.setRuntimeMode(executionMode)
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
index 850787cd2..e14edd70b 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
@@ -169,10 +169,6 @@ private[flink] class FlinkStreamingInitializer(args: 
Array[String], apiType: Api
       case p if p > 0 => localStreamEnv.setParallelism(p)
       case _ => throw new IllegalArgumentException("[StreamPark] parallelism 
must be > 0. ")
     }
-    val interval = 
Try(parameter.get(KEY_FLINK_WATERMARK_INTERVAL).toInt).getOrElse(0)
-    if (interval > 0) {
-      localStreamEnv.getConfig.setAutoWatermarkInterval(interval)
-    }
 
     val executionMode = 
Try(RuntimeExecutionMode.valueOf(parameter.get(KEY_EXECUTION_RUNTIME_MODE))).getOrElse(RuntimeExecutionMode.STREAMING)
     localStreamEnv.setRuntimeMode(executionMode)
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
index 784b48727..7c6475954 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
@@ -168,10 +168,6 @@ private[flink] class FlinkStreamingInitializer(args: 
Array[String], apiType: Api
       case p if p > 0 => localStreamEnv.setParallelism(p)
       case _ => throw new IllegalArgumentException("[StreamPark] parallelism 
must be > 0. ")
     }
-    val interval = 
Try(parameter.get(KEY_FLINK_WATERMARK_INTERVAL).toInt).getOrElse(0)
-    if (interval > 0) {
-      localStreamEnv.getConfig.setAutoWatermarkInterval(interval)
-    }
 
     val executionMode = 
Try(RuntimeExecutionMode.valueOf(parameter.get(KEY_EXECUTION_RUNTIME_MODE))).getOrElse(RuntimeExecutionMode.STREAMING)
     localStreamEnv.setRuntimeMode(executionMode)
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
index 8e3cc7bbe..eb78a17b7 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
@@ -17,7 +17,7 @@
 
 package org.apache.streampark.flink.core
 
-import org.apache.streampark.common.conf.ConfigConst.{KEY_APP_CONF, 
KEY_APP_NAME, KEY_EXECUTION_RUNTIME_MODE, KEY_FLINK_APP_NAME, 
KEY_FLINK_PARALLELISM, KEY_FLINK_SQL, KEY_FLINK_TABLE_CATALOG, 
KEY_FLINK_TABLE_DATABASE, KEY_FLINK_TABLE_MODE, KEY_FLINK_WATERMARK_INTERVAL}
+import org.apache.streampark.common.conf.ConfigConst.{KEY_APP_CONF, 
KEY_APP_NAME, KEY_EXECUTION_RUNTIME_MODE, KEY_FLINK_APP_NAME, 
KEY_FLINK_PARALLELISM, KEY_FLINK_SQL, KEY_FLINK_TABLE_CATALOG, 
KEY_FLINK_TABLE_DATABASE, KEY_FLINK_TABLE_MODE}
 import org.apache.streampark.common.enums.{ApiType, TableMode}
 import org.apache.streampark.common.enums.ApiType.ApiType
 import org.apache.streampark.common.enums.TableMode.TableMode

Reply via email to