This is an automated email from the ASF dual-hosted git repository.

lgbo-ustc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 2391c2bb8a [GLUTEN-12300][FLINK] Add support for watermark push-down 
in kafka source (#12301)
2391c2bb8a is described below

commit 2391c2bb8a1612460b968f15c36a1b8e38b09f98
Author: kevinyhzou <[email protected]>
AuthorDate: Fri Jun 26 09:48:38 2026 +0800

    [GLUTEN-12300][FLINK] Add support for watermark push-down in kafka source 
(#12301)
    
    * watermark pushdown
    
    * change version for test
    
    * fix ut
    
    * add ut
    
    * add ut for nexmark source
    
    * add watermark push down for sql
    
    * fix ut failure
    
    * fix ut
    
    * update version and doc
    
    * update version
    
    * fix pulsar build
    
    * fix ci
    
    * update flink.md
---
 gluten-flink/docs/Flink.md                         |   2 +-
 .../exec/stream/StreamExecTableSourceScan.java     |  76 +++++-
 .../functions/SubstractRexCallConverter.java       |   2 +-
 .../gluten/velox/KafkaSourceSinkFactory.java       |  13 +-
 .../table/runtime/stream/custom/NexmarkTest.java   |  42 +++
 .../gluten/velox/KafkaSourceSinkFactoryTest.java   | 302 +++++++++++++++++++++
 6 files changed, 431 insertions(+), 6 deletions(-)

diff --git a/gluten-flink/docs/Flink.md b/gluten-flink/docs/Flink.md
index b3322fa9d3..66387b782d 100644
--- a/gluten-flink/docs/Flink.md
+++ b/gluten-flink/docs/Flink.md
@@ -48,7 +48,7 @@ As some features have not been committed to upstream, you 
have to use the follow
 ## fetch velox4j code
 git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git
 cd velox4j
-git reset --hard 115edf79d265a61c30d45dfcc6ce932ad92378ca
+git reset --hard 97fc1edafebd0f505e613d260f77f92f5252d048
 mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true
 ```
 **Get gluten**
diff --git 
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTableSourceScan.java
 
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTableSourceScan.java
index 90b3981f0f..e7ec714e70 100644
--- 
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTableSourceScan.java
+++ 
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTableSourceScan.java
@@ -16,16 +16,30 @@
  */
 package org.apache.flink.table.planner.plan.nodes.exec.stream;
 
+import org.apache.gluten.rexnode.RexConversionContext;
+import org.apache.gluten.rexnode.RexNodeConverter;
+import org.apache.gluten.rexnode.Utils;
+import org.apache.gluten.util.LogicalTypeConverter;
+import org.apache.gluten.util.PlanNodeIdGenerator;
 import org.apache.gluten.velox.VeloxSourceSinkFactory;
 
+import io.github.zhztheplayer.velox4j.expression.TypedExpr;
+import io.github.zhztheplayer.velox4j.plan.EmptyNode;
+import io.github.zhztheplayer.velox4j.plan.ProjectNode;
+
 import org.apache.flink.FlinkVersion;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.transformations.SourceTransformation;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.connector.source.ScanTableSource;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec;
+import 
org.apache.flink.table.planner.plan.abilities.source.WatermarkPushDownSpec;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
@@ -34,13 +48,20 @@ import 
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecTableSour
 import 
org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSourceSpec;
 import org.apache.flink.table.planner.utils.ShortcutUtils;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.RowType.RowField;
+import org.apache.flink.table.types.logical.TimestampType;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
+import org.apache.calcite.rex.RexNode;
+
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 /**
  * Stream {@link ExecNode} to read data from an external source defined by a 
{@link
@@ -98,6 +119,51 @@ public class StreamExecTableSourceScan extends 
CommonExecTableSourceScan
     return env.createInput(inputFormat, 
outputTypeInfo).name(operatorName).getTransformation();
   }
 
+  private ProjectNode translateWatermarkExpr(
+      LogicalType inputType, LogicalType outputType, RexNode watermarkExpr) {
+    List<String> inNames = Utils.getNamesFromRowType(inputType);
+    RexConversionContext conversionContext = new RexConversionContext(inNames);
+    TypedExpr watermarkExprs = RexNodeConverter.toTypedExpr(watermarkExpr, 
conversionContext);
+    io.github.zhztheplayer.velox4j.type.RowType outputRowType =
+        (io.github.zhztheplayer.velox4j.type.RowType) 
LogicalTypeConverter.toVLType(outputType);
+    return new ProjectNode(
+        PlanNodeIdGenerator.newId(),
+        List.of(new EmptyNode(outputRowType)),
+        List.of("TIMESTAMP"),
+        List.of(watermarkExprs));
+  }
+
+  private Optional<io.github.zhztheplayer.velox4j.plan.WatermarkPushDownSpec>
+      getWatermarkPushDownSpec(Transformation<RowData> transformation, 
ExecNodeConfig config) {
+    io.github.zhztheplayer.velox4j.plan.WatermarkPushDownSpec 
watermarkPushDownSpecNode = null;
+    if (transformation instanceof SourceTransformation) {
+      List<SourceAbilitySpec> sourceAbilities = 
getTableSourceSpec().getSourceAbilities();
+      if (sourceAbilities != null) {
+        for (SourceAbilitySpec sourceAbility : sourceAbilities) {
+          if (sourceAbility instanceof WatermarkPushDownSpec) {
+            final long idleTimeout =
+                
config.get(ExecutionConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT).toMillis();
+            final long watermarkInterval =
+                config.get(PipelineOptions.AUTO_WATERMARK_INTERVAL).toMillis();
+            WatermarkPushDownSpec watermarkPushDownSpec = 
(WatermarkPushDownSpec) sourceAbility;
+            RowField watermarkField = new RowField("watermark", new 
TimestampType(3));
+            ProjectNode project =
+                translateWatermarkExpr(
+                    getOutputType(),
+                    new RowType(List.of(watermarkField)),
+                    watermarkPushDownSpec.getWatermarkExpr());
+            watermarkPushDownSpecNode =
+                new io.github.zhztheplayer.velox4j.plan.WatermarkPushDownSpec(
+                    project, idleTimeout, watermarkInterval, -1);
+          }
+        }
+      }
+    }
+    return watermarkPushDownSpecNode != null
+        ? Optional.of(watermarkPushDownSpecNode)
+        : Optional.empty();
+  }
+
   @Override
   protected Transformation<RowData> translateToPlanInternal(
       PlannerBase planner, ExecNodeConfig config) {
@@ -106,14 +172,18 @@ public class StreamExecTableSourceScan extends 
CommonExecTableSourceScan
         getTableSourceSpec()
             .getScanTableSource(
                 planner.getFlinkContext(), 
ShortcutUtils.unwrapTypeFactory(planner));
-    Transformation<RowData> sourceTransformation = 
super.translateToPlanInternal(planner, config);
+    Transformation<RowData> transformation = 
super.translateToPlanInternal(planner, config);
+    Optional<io.github.zhztheplayer.velox4j.plan.WatermarkPushDownSpec> 
watermarkPushDownSpec =
+        getWatermarkPushDownSpec(transformation, config);
     return VeloxSourceSinkFactory.buildSource(
-        sourceTransformation,
+        transformation,
         Map.of(
             ScanTableSource.class.getName(),
             tableSource,
             "checkpoint.enabled",
-            
planner.getExecEnv().getCheckpointConfig().isCheckpointingEnabled()));
+            
planner.getExecEnv().getCheckpointConfig().isCheckpointingEnabled(),
+            "watermarkPushDownSpec",
+            watermarkPushDownSpec));
     // --- End Gluten-specific code changes ---
   }
 }
diff --git 
a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/SubstractRexCallConverter.java
 
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/SubstractRexCallConverter.java
index 1c7e861595..f2e99df962 100644
--- 
a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/SubstractRexCallConverter.java
+++ 
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/SubstractRexCallConverter.java
@@ -68,7 +68,7 @@ class SubtractRexCallConverter extends BaseRexCallConverter {
         && params.get(1).getReturnType() instanceof BigIntType) {
 
       Type bigIntType = new BigIntType();
-      TypedExpr castExpr = new CallTypedExpr(bigIntType, 
List.of(params.get(0)), "cast");
+      TypedExpr castExpr = new CallTypedExpr(bigIntType, 
List.of(params.get(0)), "unix_millis");
 
       List<TypedExpr> newParams = List.of(castExpr, params.get(1));
       return new CallTypedExpr(bigIntType, newParams, functionName);
diff --git 
a/gluten-flink/planner/src/main/java/org/apache/gluten/velox/KafkaSourceSinkFactory.java
 
b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/KafkaSourceSinkFactory.java
index 8644dac3da..589e57b0a2 100644
--- 
a/gluten-flink/planner/src/main/java/org/apache/gluten/velox/KafkaSourceSinkFactory.java
+++ 
b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/KafkaSourceSinkFactory.java
@@ -24,8 +24,11 @@ import org.apache.gluten.util.ReflectUtils;
 
 import io.github.zhztheplayer.velox4j.connector.KafkaConnectorSplit;
 import io.github.zhztheplayer.velox4j.connector.KafkaTableHandle;
+import io.github.zhztheplayer.velox4j.plan.PlanNode;
 import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode;
 import io.github.zhztheplayer.velox4j.plan.TableScanNode;
+import io.github.zhztheplayer.velox4j.plan.TableScanWithWatermarkNode;
+import io.github.zhztheplayer.velox4j.plan.WatermarkPushDownSpec;
 import io.github.zhztheplayer.velox4j.type.RowType;
 
 import org.apache.flink.api.connector.source.Source;
@@ -41,6 +44,7 @@ import org.apache.flink.util.FlinkRuntimeException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.UUID;
 
@@ -68,6 +72,8 @@ public class KafkaSourceSinkFactory implements 
VeloxSourceSinkFactory {
       ScanTableSource tableSource =
           (ScanTableSource) parameters.get(ScanTableSource.class.getName());
       boolean checkpointEnabled = (Boolean) 
parameters.get("checkpoint.enabled");
+      Optional<WatermarkPushDownSpec> watermarkPushDownSpec =
+          (Optional<WatermarkPushDownSpec>) 
parameters.get("watermarkPushDownSpec");
       Class<?> tableSourceClazz =
           
Class.forName("org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource");
       Properties properties =
@@ -112,7 +118,12 @@ public class KafkaSourceSinkFactory implements 
VeloxSourceSinkFactory {
               
Boolean.valueOf(kafkaTableParameters.getOrDefault("enable.auto.commit", 
"false")),
               "latest",
               List.of());
-      TableScanNode kafkaScan = new TableScanNode(planId, outputType, 
kafkaTableHandle, List.of());
+
+      PlanNode kafkaScan =
+          watermarkPushDownSpec.isPresent()
+              ? new TableScanWithWatermarkNode(
+                  planId, outputType, kafkaTableHandle, List.of(), 
watermarkPushDownSpec.get())
+              : new TableScanNode(planId, outputType, kafkaTableHandle, 
List.of());
       GlutenStreamSource sourceOp =
           new GlutenStreamSource(
               new GlutenSourceFunction(
diff --git 
a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java
 
b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java
index 476c4cba4d..3a40bda78c 100644
--- 
a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java
+++ 
b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java
@@ -41,6 +41,7 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -104,6 +105,45 @@ public class NexmarkTest {
     tEnv = StreamTableEnvironment.create(env, settings);
   }
 
+  @Test
+  void testNexmarkSourceSqlDoesNotPushDownWatermark() {
+    String createNexmarkSource = readSqlFromFile(NEXMARK_RESOURCE_DIR + 
"/ddl_gen.sql");
+    createNexmarkSource = replaceVariables(createNexmarkSource, 
NEXMARK_VARIABLES);
+    try {
+      tEnv.executeSql(createNexmarkSource);
+      String explain = tEnv.explainSql("SELECT * FROM datagen");
+
+      assertThat(explain).contains("WatermarkAssigner");
+      List<String> tableSourceScanLines =
+          Arrays.stream(explain.split("\\R"))
+              .filter(line -> line.contains("TableSourceScan"))
+              .collect(Collectors.toList());
+      assertThat(tableSourceScanLines).isNotEmpty();
+      assertThat(tableSourceScanLines).noneMatch(line -> 
line.contains("watermark=["));
+    } finally {
+      tEnv.executeSql("drop table if exists datagen");
+    }
+  }
+
+  @Test
+  void testKafkaSourceSqlPushesDownWatermark() {
+    String createKafkaSource = readSqlFromFile(NEXMARK_RESOURCE_DIR + 
"/ddl_kafka.sql");
+    createKafkaSource = replaceVariables(createKafkaSource, KAFKA_VARIABLES);
+    try {
+      tEnv.executeSql(createKafkaSource);
+      String explain = tEnv.explainSql("SELECT * FROM kafka");
+
+      List<String> tableSourceScanLines =
+          Arrays.stream(explain.split("\\R"))
+              .filter(line -> line.contains("TableSourceScan"))
+              .collect(Collectors.toList());
+      assertThat(tableSourceScanLines).isNotEmpty();
+      assertThat(tableSourceScanLines).anyMatch(line -> 
line.contains("watermark=["));
+    } finally {
+      tEnv.executeSql("drop table if exists kafka");
+    }
+  }
+
   @Test
   void testAllNexmarkSourceQueries()
       throws ExecutionException, InterruptedException, TimeoutException {
@@ -174,6 +214,8 @@ public class NexmarkTest {
       String sql = String.format("drop function if exists %s", func);
       tEnv.executeSql(sql);
     }
+    tEnv.executeSql("drop table if exists datagen");
+    tEnv.executeSql("drop table if exists kafka");
   }
 
   private void executeQuery(StreamTableEnvironment tEnv, String queryFileName, 
boolean kafkaSource)
diff --git 
a/gluten-flink/ut/src/test/java/org/apache/gluten/velox/KafkaSourceSinkFactoryTest.java
 
b/gluten-flink/ut/src/test/java/org/apache/gluten/velox/KafkaSourceSinkFactoryTest.java
new file mode 100644
index 0000000000..1e504b3889
--- /dev/null
+++ 
b/gluten-flink/ut/src/test/java/org/apache/gluten/velox/KafkaSourceSinkFactoryTest.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.velox;
+
+import org.apache.gluten.streaming.api.operators.GlutenStreamSource;
+
+import io.github.zhztheplayer.velox4j.expression.FieldAccessTypedExpr;
+import io.github.zhztheplayer.velox4j.plan.EmptyNode;
+import io.github.zhztheplayer.velox4j.plan.PlanNode;
+import io.github.zhztheplayer.velox4j.plan.ProjectNode;
+import io.github.zhztheplayer.velox4j.plan.TableScanNode;
+import io.github.zhztheplayer.velox4j.plan.TableScanWithWatermarkNode;
+import io.github.zhztheplayer.velox4j.plan.WatermarkPushDownSpec;
+import io.github.zhztheplayer.velox4j.type.IntegerType;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import 
org.apache.flink.streaming.api.transformations.LegacySourceTransformation;
+import org.apache.flink.streaming.api.transformations.SourceTransformation;
+import org.apache.flink.streaming.connectors.kafka.config.BoundedMode;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class KafkaSourceSinkFactoryTest {
+
+  private final KafkaSourceSinkFactory factory = new KafkaSourceSinkFactory();
+
+  @Test
+  public void testBuildVeloxSourceWithoutWatermarkPushDownUsesTableScan() {
+    PlanNode scan = buildScanNode(Optional.empty());
+
+    assertThat(scan).isInstanceOf(TableScanNode.class);
+    assertThat(scan).isNotInstanceOf(TableScanWithWatermarkNode.class);
+  }
+
+  @Test
+  public void 
testBuildVeloxSourceWithWatermarkPushDownUsesTableScanWithWatermark() {
+    WatermarkPushDownSpec watermarkPushDownSpec = 
createWatermarkPushDownSpec();
+
+    PlanNode scan = buildScanNode(Optional.of(watermarkPushDownSpec));
+
+    assertThat(scan).isInstanceOf(TableScanWithWatermarkNode.class);
+    assertThat(((TableScanWithWatermarkNode) scan).getWatermarkPushDownSpec())
+        .isSameAs(watermarkPushDownSpec);
+  }
+
+  private PlanNode buildScanNode(Optional<WatermarkPushDownSpec> 
watermarkPushDownSpec) {
+    LegacySourceTransformation<RowData> transformation =
+        (LegacySourceTransformation<RowData>)
+            factory.buildVeloxSource(
+                createSourceTransformation(),
+                Map.of(
+                    ScanTableSource.class.getName(),
+                    createKafkaDynamicSource(),
+                    "checkpoint.enabled",
+                    false,
+                    "watermarkPushDownSpec",
+                    watermarkPushDownSpec));
+
+    GlutenStreamSource source = (GlutenStreamSource) 
transformation.getOperator();
+    return source.getPlanNode().getNode();
+  }
+
+  private static SourceTransformation<RowData, TestSplit, Void> 
createSourceTransformation() {
+    RowType rowType =
+        (RowType)
+            DataTypes.ROW(
+                    DataTypes.FIELD("id", DataTypes.INT()),
+                    DataTypes.FIELD("name", DataTypes.STRING()))
+                .getLogicalType();
+    return new SourceTransformation<>(
+        "KafkaSource",
+        new KafkaSource(),
+        WatermarkStrategy.noWatermarks(),
+        InternalTypeInfo.of(rowType),
+        1);
+  }
+
+  private static KafkaDynamicSource createKafkaDynamicSource() {
+    Properties properties = new Properties();
+    properties.setProperty("bootstrap.servers", "localhost:9092");
+    properties.setProperty("group.id", "test-group");
+    properties.setProperty("client.id.prefix", "test-client");
+
+    return new KafkaDynamicSource(
+        DataTypes.ROW(
+            DataTypes.FIELD("id", DataTypes.INT()), DataTypes.FIELD("name", 
DataTypes.STRING())),
+        null,
+        new TestDecodingFormat(),
+        new int[0],
+        new int[] {0, 1},
+        null,
+        List.of("test-topic"),
+        null,
+        properties,
+        StartupMode.EARLIEST,
+        Map.of(),
+        0L,
+        BoundedMode.UNBOUNDED,
+        Map.of(),
+        0L,
+        false,
+        "test-table");
+  }
+
+  private static WatermarkPushDownSpec createWatermarkPushDownSpec() {
+    IntegerType integerType = new IntegerType();
+    io.github.zhztheplayer.velox4j.type.RowType rowType =
+        new io.github.zhztheplayer.velox4j.type.RowType(List.of("id"), 
List.of(integerType));
+    ProjectNode project =
+        new ProjectNode(
+            "watermark_project",
+            List.of(new EmptyNode(rowType)),
+            List.of("watermark"),
+            List.of(FieldAccessTypedExpr.create(integerType, "id")));
+    return new WatermarkPushDownSpec(project, 1000L, 200L, -1);
+  }
+
+  private static class TestDecodingFormat
+      implements DecodingFormat<DeserializationSchema<RowData>> {
+    @Override
+    public DeserializationSchema<RowData> createRuntimeDecoder(
+        DynamicTableSource.Context context, DataType producedDataType) {
+      throw new UnsupportedOperationException("Runtime decoder is not needed 
by this test.");
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode() {
+      return ChangelogMode.insertOnly();
+    }
+  }
+
+  private static class KafkaSource implements Source<RowData, TestSplit, Void> 
{
+    @Override
+    public Boundedness getBoundedness() {
+      return Boundedness.CONTINUOUS_UNBOUNDED;
+    }
+
+    @Override
+    public SourceReader<RowData, TestSplit> createReader(SourceReaderContext 
readerContext) {
+      return new TestSourceReader();
+    }
+
+    @Override
+    public SplitEnumerator<TestSplit, Void> createEnumerator(
+        SplitEnumeratorContext<TestSplit> enumContext) {
+      return new TestSplitEnumerator();
+    }
+
+    @Override
+    public SplitEnumerator<TestSplit, Void> restoreEnumerator(
+        SplitEnumeratorContext<TestSplit> enumContext, Void checkpoint) {
+      return new TestSplitEnumerator();
+    }
+
+    @Override
+    public SimpleVersionedSerializer<TestSplit> getSplitSerializer() {
+      return new TestSplitSerializer();
+    }
+
+    @Override
+    public SimpleVersionedSerializer<Void> getEnumeratorCheckpointSerializer() 
{
+      return new TestCheckpointSerializer();
+    }
+  }
+
+  private static class TestSplit implements SourceSplit {
+    @Override
+    public String splitId() {
+      return "test-split";
+    }
+  }
+
+  private static class TestSourceReader implements SourceReader<RowData, 
TestSplit> {
+    @Override
+    public void start() {}
+
+    @Override
+    public InputStatus pollNext(ReaderOutput<RowData> output) {
+      return InputStatus.NOTHING_AVAILABLE;
+    }
+
+    @Override
+    public List<TestSplit> snapshotState(long checkpointId) {
+      return List.of();
+    }
+
+    @Override
+    public CompletableFuture<Void> isAvailable() {
+      return CompletableFuture.completedFuture(null);
+    }
+
+    @Override
+    public void addSplits(List<TestSplit> splits) {}
+
+    @Override
+    public void notifyNoMoreSplits() {}
+
+    @Override
+    public void close() {}
+  }
+
+  private static class TestSplitEnumerator implements 
SplitEnumerator<TestSplit, Void> {
+    @Override
+    public void start() {}
+
+    @Override
+    public void handleSplitRequest(int subtaskId, String requesterHostname) {}
+
+    @Override
+    public void addSplitsBack(List<TestSplit> splits, int subtaskId) {}
+
+    @Override
+    public void addReader(int subtaskId) {}
+
+    @Override
+    public Void snapshotState(long checkpointId) {
+      return null;
+    }
+
+    @Override
+    public void close() {}
+  }
+
+  private static class TestSplitSerializer implements 
SimpleVersionedSerializer<TestSplit> {
+    @Override
+    public int getVersion() {
+      return 1;
+    }
+
+    @Override
+    public byte[] serialize(TestSplit split) {
+      return new byte[0];
+    }
+
+    @Override
+    public TestSplit deserialize(int version, byte[] serialized) {
+      return new TestSplit();
+    }
+  }
+
+  private static class TestCheckpointSerializer implements 
SimpleVersionedSerializer<Void> {
+    @Override
+    public int getVersion() {
+      return 1;
+    }
+
+    @Override
+    public byte[] serialize(Void checkpoint) throws IOException {
+      return new byte[0];
+    }
+
+    @Override
+    public Void deserialize(int version, byte[] serialized) throws IOException 
{
+      return null;
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to