Myasuka commented on code in PR #22767:
URL: https://github.com/apache/flink/pull/22767#discussion_r1227481717


##########
docs/content.zh/docs/dev/table/concepts/time_attributes.md:
##########
@@ -272,6 +272,99 @@ GROUP BY TUMBLE(time_ltz, INTERVAL '10' MINUTE);
 
 ```
 
+#### 在SQL中使用watermark进阶功能
+之前的版本中,Watermark的很多进阶功能(比如watermark对齐)通过datastream 
api很容易使用,但想在sql中使用却不太容易,所以我们在1.18版本对这些功能进行了扩展,使用户也能够在sql中用到这些功能。
+
+{{< hint warning >}}
+**Note:** 只有实现了`SupportsWatermarkPushDown`接口的源连接器(source 
connector)(比如kafka、pulsar)才可以使用这些进阶功能。如果一个源连接器(source 
connector)没有实现`SupportsWatermarkPushDown`接口,但是任务配置了这些参数,任务可以正常运行,但是这些参数也不会生效。
+{{< /hint >}}
+
+##### I. 配置Watermark发射方式
+Flink中watermark有两种发射方式:
+
+- on-periodic: 周期性发射
+- on-event: 每条事件数据发射一次watermark
+
+在DataStream API,用户可以通过WatermarkGenerator接口来决定选择哪种方式([自定义 
WatermarkGenerator]({{< ref 
"docs/dev/datastream/event-time/generating_watermarks" 
>}}#自定义-watermarkgenerator)),而对于sql任务,watermark默认是周期性发射的方式,默认周期是200ms,这个周期可以通过参数`pipeline.auto-watermark-interval`来进行修改。如果需要每条事件数据都发射一次watermark,可以在source表中进行如下配置:
+
+```sql
+-- configure in table options
+CREATE TABLE user_actions (
+  ...
+  user_action_time TIMESTAMP(3),
+  WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
+) WITH (
+  'scan.watermark.emit.strategy'='on-event',
+  ...
+);
+```
+
+当然,也可以使用`OPTIONS` hint来配置:
+```sql
+-- use 'OPTIONS' hint
+select ... from source_table /*+ 
OPTIONS('scan.watermark.emit.strategy'='on-periodic') */
+```
+
+##### II. 配置数据源(Source)的空闲超时时间
+
+如果数据源中的某一个分区/分片在一段时间内未发送事件数据,则意味着`WatermarkGenerator`也不会获得任何新数据去生成watermark,我们称这类数据源为空闲输入或空闲源。在这种情况下,如果其他某些分区仍然在发送事件数据就会出现问题,因为下游算子watermark的计算方式是取所有上游并行数据源watermark的最小值,由于空闲的分片/分区没有计算新的watermark,任务的watermark将不会发生变化,如果配置了数据源的空闲超时时间,一个分区/分片在超时时间没有发送事件数据就会被标记为空闲,下游计算新的watermark的时候将会忽略这个空闲sourse,从而让watermark继续推进。
+
+在sql中可以通过`table.exec.source.idle-timeout`参数来定义一个全局的超时时间,每个数据源都会生效。但如果你想为每个数据源设置不同的空闲超时时间,可以直接在源表中进行设置:
+
+```sql
+-- configure in table options
+CREATE TABLE user_actions (
+  ...
+  user_action_time TIMESTAMP(3),
+  WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
+) WITH (
+  'scan.watermark.idle-timeout'='1min',
+  ...
+);
+```
+
+或者也可以使用`OPTIONS` hint:
+```sql
+-- use 'OPTIONS' hint
+select ... from source_table /*+ OPTIONS('scan.watermark.idle-timeout'='1min') 
*/
+```
+
+##### III. Watermark对齐
+受到数据分布或者机器负载等各种因素的影响,同一个数据源的不同分区/分片之间可能出现消费速度不一样的情况,不同数据源之间的消费速度也可能不一样,假如下游有一些有状态的算子,这些算子可能需要在状态中缓存更多那些消费更快的数据,等待那些消费慢的数据,状态可能会变得很大;消费速率不一致也可能造成更严重的数据乱序情况,可能会影响窗口的计算准确度。这些场景都可以使用watermark对齐功能,确保源表的某个分片/分块/分区的watermark不会比其他分片/分块/分区增加太快,从而避免上述问题,需要注意的是watermark对齐功能会影响任务的性能,这取决于不同源表之间数据消费差别有多大。
+
+在sql任务中可以在源表中配置watermark对齐:
+
+```sql
+-- configure in table options
+CREATE TABLE user_actions (
+...
+user_action_time TIMESTAMP(3),
+  WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
+) WITH (
+'scan.watermark.alignment.group'='alignment-group-1',
+'scan.watermark.alignment.max-drift'='1min',
+'scan.watermark.alignment.update-interval'='1s',
+...
+);
+```
+
+当然,你也依然可以用`OPTIONS` hint:
+
+```sql
+-- use 'OPTIONS' hint
+select ... from source_table /*+ 
OPTIONS('scan.watermark.alignment.group'='alignment-group-1', 
'scan.watermark.alignment.max-drift'='1min', 
'scan.watermark.alignment.update-interval'='1s') */
+```
+
+这里有三个参数:
+
+- `scan.watermark.alignment.group`配置对齐组名称,在同一个组的数据源将会对齐
+- `scan.watermark.alignment.max-drift`配置分片/分块/分区允许偏离对齐时间的最大范围
+- `scan.watermark.alignment.update-interval`配置计算对齐时间的频率,非必需,默认是1s
+
+{{< hint warning >}}
+**Note:** 如果源连接器(source 
connector)未实现[FLIP-217](https://cwiki.apache.org/confluence/display/FLINK/FLIP-217%3A+Support+watermark+alignment+of+source+splits),并且使用了watermark对齐的功能,那么任务运行会抛出异常,用户可以设置pipeline.watermark-arignment.Allow-naligned-Source-Splits=true来禁用源分片的WaterMark对齐功能,此时,只有当分片数量等于源并行度的时候,watermark对齐功能才能正常工作。

Review Comment:
   Wrong parameters, it should be 
`pipeline.watermark-alignment.allow-unaligned-source-splits`



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/watermark/WatermarkParams.java:
##########
@@ -26,27 +38,93 @@ public void setEmitStrategy(WatermarkEmitStrategy 
emitStrategy) {
         this.emitStrategy = emitStrategy;
     }
 
+    public String getAlignGroupName() {
+        return alignGroupName;
+    }
+
+    public void setAlignGroupName(String alignGroupName) {
+        this.alignGroupName = alignGroupName;
+    }
+
+    public Duration getAlignMaxDrift() {
+        return alignMaxDrift;
+    }
+
+    public void setAlignMaxDrift(Duration alignMaxDrift) {
+        this.alignMaxDrift = alignMaxDrift;
+    }
+
+    public Duration getAlignUpdateInterval() {
+        return alignUpdateInterval;
+    }
+
+    public void setAlignUpdateInterval(Duration alignUpdateInterval) {
+        this.alignUpdateInterval = alignUpdateInterval;
+    }
+
+    public boolean alignWatermarkEnabled() {
+        return !StringUtils.isNullOrWhitespaceOnly(alignGroupName)
+                && alignMaxDrift != null
+                && isDurationPositive(alignMaxDrift)
+                && alignUpdateInterval != null
+                && isDurationPositive(alignUpdateInterval);
+    }
+
+    private boolean isDurationPositive(Duration duration) {
+        return !duration.isNegative() && !duration.isZero();

Review Comment:
   We can refactor this method to avoid check whether the duration is null.
   ~~~java
       private boolean isDurationPositive(Duration duration) {
           return duration != null && !duration.isNegative() && 
!duration.isZero();
       }
   ~~~



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHintStrategies.java:
##########
@@ -122,6 +119,20 @@ private static HintOptionChecker 
fixedSizeListOptionChecker(int size) {
                             
FlinkHints.stringifyHints(Collections.singletonList(hint)),
                             hint.hintName);
 
+    private static final HintOptionChecker OPTIONS_KV_OPTION_CHECKER =

Review Comment:
   why we need to introduce such refactor?



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/WatermarkPushDownSpec.java:
##########
@@ -52,21 +54,27 @@
 public final class WatermarkPushDownSpec extends SourceAbilitySpecBase {
     public static final String FIELD_NAME_WATERMARK_EXPR = "watermarkExpr";
     public static final String FIELD_NAME_IDLE_TIMEOUT_MILLIS = 
"idleTimeoutMillis";
+    public static final String FIELD_NAME_WATERMARK_PARAMS = "watermarkParams";
 
     @JsonProperty(FIELD_NAME_WATERMARK_EXPR)
     private final RexNode watermarkExpr;
 
     @JsonProperty(FIELD_NAME_IDLE_TIMEOUT_MILLIS)
     private final long idleTimeoutMillis;
 
+    @JsonProperty(FIELD_NAME_WATERMARK_PARAMS)
+    private final WatermarkParams watermarkParams;

Review Comment:
   we should add `@Nullable` to this field.



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleBase.java:
##########
@@ -117,8 +130,37 @@ protected FlinkLogicalTableSourceScan getNewScan(
                 idleTimeoutMillis = -1L;
             }
 
+            Optional<RelHint> optionsHintOptional =
+                    scan.getHints().stream()
+                            .filter(
+                                    relHint ->
+                                            relHint.hintName.equalsIgnoreCase(
+                                                    
FlinkHints.HINT_NAME_OPTIONS))
+                            .findFirst();
+            Configuration hintOptions = new Configuration();
+            if (optionsHintOptional.isPresent()) {
+                RelHint optionsHint = optionsHintOptional.get();
+                hintOptions = Configuration.fromMap(optionsHint.kvOptions);
+            }
+            Configuration tableOptions = new Configuration();
+            RelOptTable table = scan.getTable();
+            if (table instanceof TableSourceTable) {
+                Map<String, String> tableConfigs =
+                        ((TableSourceTable) table)
+                                .contextResolvedTable()
+                                .getResolvedTable()
+                                .getOptions();
+                tableOptions = Configuration.fromMap(tableConfigs);
+            }
+            Optional<WatermarkParams> watermarkParamsOptional =
+                    parseWatermarkParams(hintOptions, tableOptions);
+
             final WatermarkPushDownSpec watermarkPushDownSpec =
-                    new WatermarkPushDownSpec(watermarkExpr, 
idleTimeoutMillis, producedType);
+                    new WatermarkPushDownSpec(
+                            watermarkExpr,
+                            idleTimeoutMillis,
+                            producedType,
+                            watermarkParamsOptional.orElse(null));

Review Comment:
   I think the `watermarkParamsOptional` would not be empty. If so, why we 
still need to introduce `parseWatermarkParams` to generate the Optional 
parameter?
   
   Moreover, if we refactor this, the `watermarkParams` in 
`WatermarkPushDownSpec` cannot be nullable.



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.java:
##########
@@ -275,4 +275,124 @@ public void 
testWatermarkWithPythonFunctionInComputedColumn() {
         util.tableEnv().executeSql(ddl);
         util.verifyRelPlan("SELECT * FROM MyTable");
     }
+
+    @Test
+    public void testWatermarkEmitStrategyWithOptions() {
+        String ddl =
+                "CREATE TABLE MyTable("
+                        + "  a INT,\n"
+                        + "  b BIGINT,\n"
+                        + "  c TIMESTAMP(3),\n"
+                        + "  WATERMARK FOR c AS c - INTERVAL '5' SECOND\n"
+                        + ") WITH (\n"
+                        + "  'connector' = 'values',\n"
+                        + "  'enable-watermark-push-down' = 'true',\n"
+                        + "  'bounded' = 'false',\n"
+                        + "  'scan.watermark.emit.strategy' = 'on-event',\n"
+                        + "  'disable-lookup' = 'true'"
+                        + ")";
+        util.tableEnv().executeSql(ddl);
+        util.verifyRelPlan("select a, c from MyTable");
+    }
+
+    @Test
+    public void testWatermarkEmitStrategyWithHint() {
+        String ddl =
+                "CREATE TABLE MyTable("
+                        + "  a INT,\n"
+                        + "  b BIGINT,\n"
+                        + "  c TIMESTAMP(3),\n"
+                        + "  WATERMARK FOR c AS c - INTERVAL '5' SECOND\n"
+                        + ") WITH (\n"
+                        + "  'connector' = 'values',\n"
+                        + "  'enable-watermark-push-down' = 'true',\n"
+                        + "  'bounded' = 'false',\n"
+                        + "  'disable-lookup' = 'true'"
+                        + ")";
+        util.tableEnv().executeSql(ddl);
+        util.verifyRelPlan(
+                "select a, c from MyTable /*+ OPTIONS("
+                        + "'scan.watermark.emit.strategy'='on-event') */");
+    }
+
+    @Test
+    public void testWatermarkAlignmentWithOptions() {
+        String ddl =
+                "CREATE TABLE MyTable("
+                        + "  a INT,\n"
+                        + "  b BIGINT,\n"
+                        + "  c TIMESTAMP(3),\n"
+                        + "  WATERMARK FOR c AS c - INTERVAL '5' SECOND\n"
+                        + ") WITH (\n"
+                        + "  'connector' = 'values',\n"
+                        + "  'enable-watermark-push-down' = 'true',\n"
+                        + "  'bounded' = 'false',\n"
+                        + "  'scan.watermark.alignment.group' = 'group1',\n"
+                        + "  'scan.watermark.alignment.max-drift' = '1min',\n"
+                        + "  'disable-lookup' = 'true'"
+                        + ")";
+        util.tableEnv().executeSql(ddl);
+        util.verifyRelPlan("select a, c from MyTable");
+    }
+
+    @Test
+    public void testWatermarkAlignmentWithHint() {
+        String ddl =
+                "CREATE TABLE MyTable("
+                        + "  a INT,\n"
+                        + "  b BIGINT,\n"
+                        + "  c TIMESTAMP(3),\n"
+                        + "  WATERMARK FOR c AS c - INTERVAL '5' SECOND\n"
+                        + ") WITH (\n"
+                        + "  'connector' = 'values',\n"
+                        + "  'enable-watermark-push-down' = 'true',\n"
+                        + "  'bounded' = 'false',\n"
+                        + "  'disable-lookup' = 'true'"
+                        + ")";
+        util.tableEnv().executeSql(ddl);
+        util.verifyRelPlan(
+                "select a, c from MyTable /*+ OPTIONS("
+                        + "'scan.watermark.alignment.group'='group1', "
+                        + "'scan.watermark.alignment.max-drift'='1min') */");
+    }
+
+    @Test
+    public void testIdleSourceWithOptions() {
+        util.tableEnv().getConfig().set(TABLE_EXEC_SOURCE_IDLE_TIMEOUT, 
Duration.ofMillis(1000));
+        String ddl =
+                "CREATE TABLE MyTable("
+                        + "  a INT,\n"
+                        + "  b BIGINT,\n"
+                        + "  c TIMESTAMP(3),\n"
+                        + "  WATERMARK FOR c AS c - INTERVAL '5' SECOND\n"
+                        + ") WITH (\n"
+                        + "  'connector' = 'values',\n"
+                        + "  'enable-watermark-push-down' = 'true',\n"
+                        + "  'bounded' = 'false',\n"
+                        + "  'disable-lookup' = 'true',\n"
+                        + "  'scan.watermark.idle-timeout' = '60000'"

Review Comment:
   I think we'd better to avoid duration configuration without time unit.



##########
docs/content/docs/dev/table/concepts/time_attributes.md:
##########
@@ -99,6 +99,101 @@ GROUP BY TUMBLE(time_ltz, INTERVAL '10' MINUTE);
 
 ```
 
+#### Advanced watermark features
+
+In previous versions, many advanced features of watermark (such as watermark 
alignment) were easy to use through the datastream api, but not so easy to use 
in sql, so we have extended these features in version 1.18 to enable users to 
use them in sql as well.
+
+{{< hint warning >}}
+**Note:** Only source connectors that implement the 
`SupportsWatermarkPushDown` interface (e.g. kafka, pulsar) can use these 
advanced features. If a source does not implement the 
`SupportsWatermarkPushDown` interface, but the task is configured with these 
parameters, the task can run normally, but these parameters will not take 
effect.
+{{< /hint >}}
+
+##### I. Configure watermark emit strategy
+There are two strategies to emit watermark in flink:
+
+- on-periodic: emit watermark periodic
+- on-event: emit watermark per event data
+
+In the DataStream API, the user can choose the emit strategy through the 
WatermarkGenerator interface ([Writing WatermarkGenerators]({{< ref 
"docs/dev/datastream/event-time/generating_watermarks" 
>}}#writing-watermarkgenerators)). For sql tasks, watermark is emited 
periodically by default, with a default period of 200ms, which can be changed 
by the parameter `pipeline.auto-watermark-interval`. If you need to emit 
watermark per event data, you can configure it in the source table as follows:
+
+```sql
+-- configure in table options
+CREATE TABLE user_actions (
+  ...
+  user_action_time TIMESTAMP(3).
+  WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
+) WITH (
+  'scan.watermark.exit.strategy'='on-event'.
+  ...
+)
+```
+
+Of course, you can also use the `OPTIONS` hint:
+```sql
+-- use 'OPTIONS' hint
+select ... from source_table /*+ 
OPTIONS('scan.watermark.exit.strategy'='on-periodic') */
+```
+
+##### II. Configure the idle-timeout of source table
+
+If a split/partition/shard in the source table does not send event data for 
some time, it means that `WatermarkGenerator` will not get any new data to 
generate watermark either, we call such data sources as idle inputs or idle 
sources. In this case, a problem occurs if some other partition is still 
sending event data, because the downstream operator's watermark is calculated 
by taking the minimum value of all upstream parallel data sources' watermarks, 
and since the idle split/partition/shard is not generating a new watermark, the 
downstream operator's watermark will not change. However, if the idle timeout 
is configured, the split/partition/shard will be marked as idle if no event 
data is sent in the timeout, and the downstream will ignore this idle sourse 
when calculating new watermark.
+
+A global idle timeout can be defined in sql with the 
`table.exec.source.idle-timeout` parameter, which will take effect for each 
source table. However, if you want to set a different idle timeout for each 
source table, you can configure in the source table by parameter 
`scan.watermark.idle-timeout` like this:

Review Comment:
   We should add related descriptions to the configuration of 
`table.exec.source.idle-timeout` to tell users that this value could be 
overridden by settings of table scope.



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/WatermarkPushDownSpec.java:
##########
@@ -81,12 +89,21 @@ public void apply(DynamicTableSource tableSource, 
SourceAbilityContext context)
                             Option.apply("context"));
 
             WatermarkGeneratorSupplier<RowData> supplier =
-                    new 
GeneratedWatermarkGeneratorSupplier(generatedWatermarkGenerator);
+                    new GeneratedWatermarkGeneratorSupplier(
+                            generatedWatermarkGenerator, watermarkParams);
 
             WatermarkStrategy<RowData> watermarkStrategy = 
WatermarkStrategy.forGenerator(supplier);
-            if (idleTimeoutMillis > 0) {
+            if (watermarkParams != null && 
watermarkParams.alignWatermarkEnabled()) {
+                watermarkStrategy =
+                        watermarkStrategy.withWatermarkAlignment(

Review Comment:
   Since we already provide the `watermarkParams` to 
`GeneratedWatermarkGeneratorSupplier`, why we still need to regenerate the 
`watermarkStrategy`? Can we just get the needed `watermarkStrategy` once 
calling `WatermarkStrategy.forGenerator(supplier)`?



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/watermark/WatermarkEmitStrategy.java:
##########
@@ -0,0 +1,50 @@
+package org.apache.flink.table.watermark;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Arrays;
+
+/** The strategy for emitting watermark. */
+@PublicEvolving
+public enum WatermarkEmitStrategy {
+    ON_EVENT("on-event"),
+    ON_PERIODIC("on-periodic"),
+    ;
+
+    private final String alias;
+
+    public String getAlias() {
+        return alias;
+    }
+
+    WatermarkEmitStrategy(String alias) {
+        this.alias = alias;
+    }
+
+    public boolean isOnEvent() {
+        return this == ON_EVENT;
+    }
+
+    public boolean isOnPeriodic() {
+        return this == ON_PERIODIC;
+    }
+
+    @Override
+    public String toString() {
+        return this.alias;
+    }
+
+    public static WatermarkEmitStrategy lookup(String alias) {

Review Comment:
   Where did this method use?



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java:
##########
@@ -133,6 +144,14 @@ public final class FactoryUtil {
      */
     public static final String PLACEHOLDER_SYMBOL = "#";
 
+    private static final Set<ConfigOption<?>> watermarkOptionSet;

Review Comment:
   We can still initialize the `watermarkOptionSet` in one line:
   ~~~java
       private static final Set<ConfigOption<?>> watermarkOptionSet = 
Collections.unmodifiableSet(
               new HashSet<>(Arrays.asList(
                       WATERMARK_EMIT_STRATEGY,
                       WATERMARK_ALIGNMENT_GROUP,
                       WATERMARK_ALIGNMENT_MAX_DRIFT,
                       WATERMARK_ALIGNMENT_UPDATE_INTERVAL,
                       SOURCE_IDLE_TIMEOUT)));
   ~~~



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java:
##########
@@ -129,6 +130,24 @@ public final class FactoryUtil {
                                     + "'on-periodic' means emitting watermark 
periodically. "
                                     + "The default strategy is 'on-periodic'");
 
+    public static final ConfigOption<String> WATERMARK_ALIGNMENT_GROUP =
+            ConfigOptions.key("scan.watermark.alignment.group")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The watermark alignment group name.");
+
+    public static final ConfigOption<Duration> WATERMARK_ALIGNMENT_MAX_DRIFT =
+            ConfigOptions.key("scan.watermark.alignment.max-drift")
+                    .durationType()
+                    .noDefaultValue()
+                    .withDescription("The max allowed watermark drift.");
+
+    public static final ConfigOption<Duration> 
WATERMARK_ALIGNMENT_UPDATE_INTERVAL =
+            ConfigOptions.key("scan.watermark.alignment.update-interval")
+                    .durationType()
+                    .defaultValue(Duration.ofMillis(1000))
+                    .withDescription("update interval to align watermark.");

Review Comment:
   ~~~java
   .withDescription("Update interval to align watermark.");
   ~~~



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/WatermarkPushDownSpec.java:
##########
@@ -104,11 +121,25 @@ public String getDigests(SourceAbilityContext context) {
                         watermarkExpr,
                         JavaScalaConversionUtil.toScala(
                                 context.getSourceRowType().getFieldNames()));
-        if (idleTimeoutMillis > 0) {
-            return String.format(
-                    "watermark=[%s], idletimeout=[%d]", expressionStr, 
idleTimeoutMillis);
+        String digest = String.format("watermark=[%s]", expressionStr);
+        long actualIdleTimeoutMillis = calculateIdleTimeoutMillis();
+        if (actualIdleTimeoutMillis > 0) {
+            digest = String.format("%s, idletimeout=[%d]", digest, 
actualIdleTimeoutMillis);

Review Comment:
   ```suggestion
               digest = String.format("%s, idleTimeout=[%d]", digest, 
actualIdleTimeoutMillis);
   ```



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/WatermarkPushDownSpec.java:
##########
@@ -52,21 +54,27 @@
 public final class WatermarkPushDownSpec extends SourceAbilitySpecBase {
     public static final String FIELD_NAME_WATERMARK_EXPR = "watermarkExpr";
     public static final String FIELD_NAME_IDLE_TIMEOUT_MILLIS = 
"idleTimeoutMillis";
+    public static final String FIELD_NAME_WATERMARK_PARAMS = "watermarkParams";
 
     @JsonProperty(FIELD_NAME_WATERMARK_EXPR)
     private final RexNode watermarkExpr;
 
     @JsonProperty(FIELD_NAME_IDLE_TIMEOUT_MILLIS)
     private final long idleTimeoutMillis;

Review Comment:
   I think this field should be renamed as `globalIdleTimeoutMillis`



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/watermark/WatermarkParams.java:
##########
@@ -0,0 +1,153 @@
+package org.apache.flink.table.watermark;

Review Comment:
   This file lacks of apache license.



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/WatermarkPushDownSpec.java:
##########
@@ -131,4 +162,12 @@ public boolean equals(Object o) {
     public int hashCode() {
         return Objects.hash(super.hashCode(), watermarkExpr, 
idleTimeoutMillis);
     }
+
+    private long calculateIdleTimeoutMillis() {
+        long actualIdleTimeoutMillis = idleTimeoutMillis;
+        if (watermarkParams != null && watermarkParams.getSourceIdleTimeout() 
> 0) {

Review Comment:
   What will happen if user want to set the idle timeout as zero?



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/watermark/WatermarkEmitStrategy.java:
##########
@@ -0,0 +1,50 @@
+package org.apache.flink.table.watermark;

Review Comment:
   This file lacks of apache license.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to