This is an automated email from the ASF dual-hosted git repository.
lgbo-ustc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 2391c2bb8a [GLUTEN-12300][FLINK] Add support for watermark push-down
in kafka source (#12301)
2391c2bb8a is described below
commit 2391c2bb8a1612460b968f15c36a1b8e38b09f98
Author: kevinyhzou <[email protected]>
AuthorDate: Fri Jun 26 09:48:38 2026 +0800
[GLUTEN-12300][FLINK] Add support for watermark push-down in kafka source
(#12301)
* watermark pushdown
* change version for test
* fix ut
* add ut
* add ut for nexmark source
* add watermark push down for sql
* fix ut failure
* fix ut
* update version and doc
* update version
* fix pulsar build
* fix ci
* update flink.md
---
gluten-flink/docs/Flink.md | 2 +-
.../exec/stream/StreamExecTableSourceScan.java | 76 +++++-
.../functions/SubstractRexCallConverter.java | 2 +-
.../gluten/velox/KafkaSourceSinkFactory.java | 13 +-
.../table/runtime/stream/custom/NexmarkTest.java | 42 +++
.../gluten/velox/KafkaSourceSinkFactoryTest.java | 302 +++++++++++++++++++++
6 files changed, 431 insertions(+), 6 deletions(-)
diff --git a/gluten-flink/docs/Flink.md b/gluten-flink/docs/Flink.md
index b3322fa9d3..66387b782d 100644
--- a/gluten-flink/docs/Flink.md
+++ b/gluten-flink/docs/Flink.md
@@ -48,7 +48,7 @@ As some features have not been committed to upstream, you
have to use the follow
## fetch velox4j code
git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git
cd velox4j
-git reset --hard 115edf79d265a61c30d45dfcc6ce932ad92378ca
+git reset --hard 97fc1edafebd0f505e613d260f77f92f5252d048
mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true
```
**Get gluten**
diff --git
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTableSourceScan.java
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTableSourceScan.java
index 90b3981f0f..e7ec714e70 100644
---
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTableSourceScan.java
+++
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTableSourceScan.java
@@ -16,16 +16,30 @@
*/
package org.apache.flink.table.planner.plan.nodes.exec.stream;
+import org.apache.gluten.rexnode.RexConversionContext;
+import org.apache.gluten.rexnode.RexNodeConverter;
+import org.apache.gluten.rexnode.Utils;
+import org.apache.gluten.util.LogicalTypeConverter;
+import org.apache.gluten.util.PlanNodeIdGenerator;
import org.apache.gluten.velox.VeloxSourceSinkFactory;
+import io.github.zhztheplayer.velox4j.expression.TypedExpr;
+import io.github.zhztheplayer.velox4j.plan.EmptyNode;
+import io.github.zhztheplayer.velox4j.plan.ProjectNode;
+
import org.apache.flink.FlinkVersion;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.transformations.SourceTransformation;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec;
+import
org.apache.flink.table.planner.plan.abilities.source.WatermarkPushDownSpec;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
@@ -34,13 +48,20 @@ import
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecTableSour
import
org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSourceSpec;
import org.apache.flink.table.planner.utils.ShortcutUtils;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.RowType.RowField;
+import org.apache.flink.table.types.logical.TimestampType;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.calcite.rex.RexNode;
+
import java.util.Collections;
+import java.util.List;
import java.util.Map;
+import java.util.Optional;
/**
* Stream {@link ExecNode} to read data from an external source defined by a
{@link
@@ -98,6 +119,51 @@ public class StreamExecTableSourceScan extends
CommonExecTableSourceScan
return env.createInput(inputFormat,
outputTypeInfo).name(operatorName).getTransformation();
}
+ private ProjectNode translateWatermarkExpr(
+ LogicalType inputType, LogicalType outputType, RexNode watermarkExpr) {
+ List<String> inNames = Utils.getNamesFromRowType(inputType);
+ RexConversionContext conversionContext = new RexConversionContext(inNames);
+ TypedExpr watermarkExprs = RexNodeConverter.toTypedExpr(watermarkExpr,
conversionContext);
+ io.github.zhztheplayer.velox4j.type.RowType outputRowType =
+ (io.github.zhztheplayer.velox4j.type.RowType)
LogicalTypeConverter.toVLType(outputType);
+ return new ProjectNode(
+ PlanNodeIdGenerator.newId(),
+ List.of(new EmptyNode(outputRowType)),
+ List.of("TIMESTAMP"),
+ List.of(watermarkExprs));
+ }
+
+ private Optional<io.github.zhztheplayer.velox4j.plan.WatermarkPushDownSpec>
+ getWatermarkPushDownSpec(Transformation<RowData> transformation,
ExecNodeConfig config) {
+ io.github.zhztheplayer.velox4j.plan.WatermarkPushDownSpec
watermarkPushDownSpecNode = null;
+ if (transformation instanceof SourceTransformation) {
+ List<SourceAbilitySpec> sourceAbilities =
getTableSourceSpec().getSourceAbilities();
+ if (sourceAbilities != null) {
+ for (SourceAbilitySpec sourceAbility : sourceAbilities) {
+ if (sourceAbility instanceof WatermarkPushDownSpec) {
+ final long idleTimeout =
+
config.get(ExecutionConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT).toMillis();
+ final long watermarkInterval =
+ config.get(PipelineOptions.AUTO_WATERMARK_INTERVAL).toMillis();
+ WatermarkPushDownSpec watermarkPushDownSpec =
(WatermarkPushDownSpec) sourceAbility;
+ RowField watermarkField = new RowField("watermark", new
TimestampType(3));
+ ProjectNode project =
+ translateWatermarkExpr(
+ getOutputType(),
+ new RowType(List.of(watermarkField)),
+ watermarkPushDownSpec.getWatermarkExpr());
+ watermarkPushDownSpecNode =
+ new io.github.zhztheplayer.velox4j.plan.WatermarkPushDownSpec(
+ project, idleTimeout, watermarkInterval, -1);
+ }
+ }
+ }
+ }
+ return watermarkPushDownSpecNode != null
+ ? Optional.of(watermarkPushDownSpecNode)
+ : Optional.empty();
+ }
+
@Override
protected Transformation<RowData> translateToPlanInternal(
PlannerBase planner, ExecNodeConfig config) {
@@ -106,14 +172,18 @@ public class StreamExecTableSourceScan extends
CommonExecTableSourceScan
getTableSourceSpec()
.getScanTableSource(
planner.getFlinkContext(),
ShortcutUtils.unwrapTypeFactory(planner));
- Transformation<RowData> sourceTransformation =
super.translateToPlanInternal(planner, config);
+ Transformation<RowData> transformation =
super.translateToPlanInternal(planner, config);
+ Optional<io.github.zhztheplayer.velox4j.plan.WatermarkPushDownSpec>
watermarkPushDownSpec =
+ getWatermarkPushDownSpec(transformation, config);
return VeloxSourceSinkFactory.buildSource(
- sourceTransformation,
+ transformation,
Map.of(
ScanTableSource.class.getName(),
tableSource,
"checkpoint.enabled",
-
planner.getExecEnv().getCheckpointConfig().isCheckpointingEnabled()));
+
planner.getExecEnv().getCheckpointConfig().isCheckpointingEnabled(),
+ "watermarkPushDownSpec",
+ watermarkPushDownSpec));
// --- End Gluten-specific code changes ---
}
}
diff --git
a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/SubstractRexCallConverter.java
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/SubstractRexCallConverter.java
index 1c7e861595..f2e99df962 100644
---
a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/SubstractRexCallConverter.java
+++
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/SubstractRexCallConverter.java
@@ -68,7 +68,7 @@ class SubtractRexCallConverter extends BaseRexCallConverter {
&& params.get(1).getReturnType() instanceof BigIntType) {
Type bigIntType = new BigIntType();
- TypedExpr castExpr = new CallTypedExpr(bigIntType,
List.of(params.get(0)), "cast");
+ TypedExpr castExpr = new CallTypedExpr(bigIntType,
List.of(params.get(0)), "unix_millis");
List<TypedExpr> newParams = List.of(castExpr, params.get(1));
return new CallTypedExpr(bigIntType, newParams, functionName);
diff --git
a/gluten-flink/planner/src/main/java/org/apache/gluten/velox/KafkaSourceSinkFactory.java
b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/KafkaSourceSinkFactory.java
index 8644dac3da..589e57b0a2 100644
---
a/gluten-flink/planner/src/main/java/org/apache/gluten/velox/KafkaSourceSinkFactory.java
+++
b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/KafkaSourceSinkFactory.java
@@ -24,8 +24,11 @@ import org.apache.gluten.util.ReflectUtils;
import io.github.zhztheplayer.velox4j.connector.KafkaConnectorSplit;
import io.github.zhztheplayer.velox4j.connector.KafkaTableHandle;
+import io.github.zhztheplayer.velox4j.plan.PlanNode;
import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode;
import io.github.zhztheplayer.velox4j.plan.TableScanNode;
+import io.github.zhztheplayer.velox4j.plan.TableScanWithWatermarkNode;
+import io.github.zhztheplayer.velox4j.plan.WatermarkPushDownSpec;
import io.github.zhztheplayer.velox4j.type.RowType;
import org.apache.flink.api.connector.source.Source;
@@ -41,6 +44,7 @@ import org.apache.flink.util.FlinkRuntimeException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
@@ -68,6 +72,8 @@ public class KafkaSourceSinkFactory implements
VeloxSourceSinkFactory {
ScanTableSource tableSource =
(ScanTableSource) parameters.get(ScanTableSource.class.getName());
boolean checkpointEnabled = (Boolean)
parameters.get("checkpoint.enabled");
+ Optional<WatermarkPushDownSpec> watermarkPushDownSpec =
+ (Optional<WatermarkPushDownSpec>)
parameters.get("watermarkPushDownSpec");
Class<?> tableSourceClazz =
Class.forName("org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource");
Properties properties =
@@ -112,7 +118,12 @@ public class KafkaSourceSinkFactory implements
VeloxSourceSinkFactory {
Boolean.valueOf(kafkaTableParameters.getOrDefault("enable.auto.commit",
"false")),
"latest",
List.of());
- TableScanNode kafkaScan = new TableScanNode(planId, outputType,
kafkaTableHandle, List.of());
+
+ PlanNode kafkaScan =
+ watermarkPushDownSpec.isPresent()
+ ? new TableScanWithWatermarkNode(
+ planId, outputType, kafkaTableHandle, List.of(),
watermarkPushDownSpec.get())
+ : new TableScanNode(planId, outputType, kafkaTableHandle,
List.of());
GlutenStreamSource sourceOp =
new GlutenStreamSource(
new GlutenSourceFunction(
diff --git
a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java
b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java
index 476c4cba4d..3a40bda78c 100644
---
a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java
+++
b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java
@@ -41,6 +41,7 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -104,6 +105,45 @@ public class NexmarkTest {
tEnv = StreamTableEnvironment.create(env, settings);
}
+ @Test
+ void testNexmarkSourceSqlDoesNotPushDownWatermark() {
+ String createNexmarkSource = readSqlFromFile(NEXMARK_RESOURCE_DIR +
"/ddl_gen.sql");
+ createNexmarkSource = replaceVariables(createNexmarkSource,
NEXMARK_VARIABLES);
+ try {
+ tEnv.executeSql(createNexmarkSource);
+ String explain = tEnv.explainSql("SELECT * FROM datagen");
+
+ assertThat(explain).contains("WatermarkAssigner");
+ List<String> tableSourceScanLines =
+ Arrays.stream(explain.split("\\R"))
+ .filter(line -> line.contains("TableSourceScan"))
+ .collect(Collectors.toList());
+ assertThat(tableSourceScanLines).isNotEmpty();
+ assertThat(tableSourceScanLines).noneMatch(line ->
line.contains("watermark=["));
+ } finally {
+ tEnv.executeSql("drop table if exists datagen");
+ }
+ }
+
+ @Test
+ void testKafkaSourceSqlPushesDownWatermark() {
+ String createKafkaSource = readSqlFromFile(NEXMARK_RESOURCE_DIR +
"/ddl_kafka.sql");
+ createKafkaSource = replaceVariables(createKafkaSource, KAFKA_VARIABLES);
+ try {
+ tEnv.executeSql(createKafkaSource);
+ String explain = tEnv.explainSql("SELECT * FROM kafka");
+
+ List<String> tableSourceScanLines =
+ Arrays.stream(explain.split("\\R"))
+ .filter(line -> line.contains("TableSourceScan"))
+ .collect(Collectors.toList());
+ assertThat(tableSourceScanLines).isNotEmpty();
+ assertThat(tableSourceScanLines).anyMatch(line ->
line.contains("watermark=["));
+ } finally {
+ tEnv.executeSql("drop table if exists kafka");
+ }
+ }
+
@Test
void testAllNexmarkSourceQueries()
throws ExecutionException, InterruptedException, TimeoutException {
@@ -174,6 +214,8 @@ public class NexmarkTest {
String sql = String.format("drop function if exists %s", func);
tEnv.executeSql(sql);
}
+ tEnv.executeSql("drop table if exists datagen");
+ tEnv.executeSql("drop table if exists kafka");
}
private void executeQuery(StreamTableEnvironment tEnv, String queryFileName,
boolean kafkaSource)
diff --git
a/gluten-flink/ut/src/test/java/org/apache/gluten/velox/KafkaSourceSinkFactoryTest.java
b/gluten-flink/ut/src/test/java/org/apache/gluten/velox/KafkaSourceSinkFactoryTest.java
new file mode 100644
index 0000000000..1e504b3889
--- /dev/null
+++
b/gluten-flink/ut/src/test/java/org/apache/gluten/velox/KafkaSourceSinkFactoryTest.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.velox;
+
+import org.apache.gluten.streaming.api.operators.GlutenStreamSource;
+
+import io.github.zhztheplayer.velox4j.expression.FieldAccessTypedExpr;
+import io.github.zhztheplayer.velox4j.plan.EmptyNode;
+import io.github.zhztheplayer.velox4j.plan.PlanNode;
+import io.github.zhztheplayer.velox4j.plan.ProjectNode;
+import io.github.zhztheplayer.velox4j.plan.TableScanNode;
+import io.github.zhztheplayer.velox4j.plan.TableScanWithWatermarkNode;
+import io.github.zhztheplayer.velox4j.plan.WatermarkPushDownSpec;
+import io.github.zhztheplayer.velox4j.type.IntegerType;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import
org.apache.flink.streaming.api.transformations.LegacySourceTransformation;
+import org.apache.flink.streaming.api.transformations.SourceTransformation;
+import org.apache.flink.streaming.connectors.kafka.config.BoundedMode;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class KafkaSourceSinkFactoryTest {
+
+ private final KafkaSourceSinkFactory factory = new KafkaSourceSinkFactory();
+
+ @Test
+ public void testBuildVeloxSourceWithoutWatermarkPushDownUsesTableScan() {
+ PlanNode scan = buildScanNode(Optional.empty());
+
+ assertThat(scan).isInstanceOf(TableScanNode.class);
+ assertThat(scan).isNotInstanceOf(TableScanWithWatermarkNode.class);
+ }
+
+ @Test
+ public void
testBuildVeloxSourceWithWatermarkPushDownUsesTableScanWithWatermark() {
+ WatermarkPushDownSpec watermarkPushDownSpec =
createWatermarkPushDownSpec();
+
+ PlanNode scan = buildScanNode(Optional.of(watermarkPushDownSpec));
+
+ assertThat(scan).isInstanceOf(TableScanWithWatermarkNode.class);
+ assertThat(((TableScanWithWatermarkNode) scan).getWatermarkPushDownSpec())
+ .isSameAs(watermarkPushDownSpec);
+ }
+
+ private PlanNode buildScanNode(Optional<WatermarkPushDownSpec>
watermarkPushDownSpec) {
+ LegacySourceTransformation<RowData> transformation =
+ (LegacySourceTransformation<RowData>)
+ factory.buildVeloxSource(
+ createSourceTransformation(),
+ Map.of(
+ ScanTableSource.class.getName(),
+ createKafkaDynamicSource(),
+ "checkpoint.enabled",
+ false,
+ "watermarkPushDownSpec",
+ watermarkPushDownSpec));
+
+ GlutenStreamSource source = (GlutenStreamSource)
transformation.getOperator();
+ return source.getPlanNode().getNode();
+ }
+
+ private static SourceTransformation<RowData, TestSplit, Void>
createSourceTransformation() {
+ RowType rowType =
+ (RowType)
+ DataTypes.ROW(
+ DataTypes.FIELD("id", DataTypes.INT()),
+ DataTypes.FIELD("name", DataTypes.STRING()))
+ .getLogicalType();
+ return new SourceTransformation<>(
+ "KafkaSource",
+ new KafkaSource(),
+ WatermarkStrategy.noWatermarks(),
+ InternalTypeInfo.of(rowType),
+ 1);
+ }
+
+ private static KafkaDynamicSource createKafkaDynamicSource() {
+ Properties properties = new Properties();
+ properties.setProperty("bootstrap.servers", "localhost:9092");
+ properties.setProperty("group.id", "test-group");
+ properties.setProperty("client.id.prefix", "test-client");
+
+ return new KafkaDynamicSource(
+ DataTypes.ROW(
+ DataTypes.FIELD("id", DataTypes.INT()), DataTypes.FIELD("name",
DataTypes.STRING())),
+ null,
+ new TestDecodingFormat(),
+ new int[0],
+ new int[] {0, 1},
+ null,
+ List.of("test-topic"),
+ null,
+ properties,
+ StartupMode.EARLIEST,
+ Map.of(),
+ 0L,
+ BoundedMode.UNBOUNDED,
+ Map.of(),
+ 0L,
+ false,
+ "test-table");
+ }
+
+ private static WatermarkPushDownSpec createWatermarkPushDownSpec() {
+ IntegerType integerType = new IntegerType();
+ io.github.zhztheplayer.velox4j.type.RowType rowType =
+ new io.github.zhztheplayer.velox4j.type.RowType(List.of("id"),
List.of(integerType));
+ ProjectNode project =
+ new ProjectNode(
+ "watermark_project",
+ List.of(new EmptyNode(rowType)),
+ List.of("watermark"),
+ List.of(FieldAccessTypedExpr.create(integerType, "id")));
+ return new WatermarkPushDownSpec(project, 1000L, 200L, -1);
+ }
+
+ private static class TestDecodingFormat
+ implements DecodingFormat<DeserializationSchema<RowData>> {
+ @Override
+ public DeserializationSchema<RowData> createRuntimeDecoder(
+ DynamicTableSource.Context context, DataType producedDataType) {
+ throw new UnsupportedOperationException("Runtime decoder is not needed
by this test.");
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode() {
+ return ChangelogMode.insertOnly();
+ }
+ }
+
+ private static class KafkaSource implements Source<RowData, TestSplit, Void>
{
+ @Override
+ public Boundedness getBoundedness() {
+ return Boundedness.CONTINUOUS_UNBOUNDED;
+ }
+
+ @Override
+ public SourceReader<RowData, TestSplit> createReader(SourceReaderContext
readerContext) {
+ return new TestSourceReader();
+ }
+
+ @Override
+ public SplitEnumerator<TestSplit, Void> createEnumerator(
+ SplitEnumeratorContext<TestSplit> enumContext) {
+ return new TestSplitEnumerator();
+ }
+
+ @Override
+ public SplitEnumerator<TestSplit, Void> restoreEnumerator(
+ SplitEnumeratorContext<TestSplit> enumContext, Void checkpoint) {
+ return new TestSplitEnumerator();
+ }
+
+ @Override
+ public SimpleVersionedSerializer<TestSplit> getSplitSerializer() {
+ return new TestSplitSerializer();
+ }
+
+ @Override
+ public SimpleVersionedSerializer<Void> getEnumeratorCheckpointSerializer()
{
+ return new TestCheckpointSerializer();
+ }
+ }
+
+ private static class TestSplit implements SourceSplit {
+ @Override
+ public String splitId() {
+ return "test-split";
+ }
+ }
+
+ private static class TestSourceReader implements SourceReader<RowData,
TestSplit> {
+ @Override
+ public void start() {}
+
+ @Override
+ public InputStatus pollNext(ReaderOutput<RowData> output) {
+ return InputStatus.NOTHING_AVAILABLE;
+ }
+
+ @Override
+ public List<TestSplit> snapshotState(long checkpointId) {
+ return List.of();
+ }
+
+ @Override
+ public CompletableFuture<Void> isAvailable() {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public void addSplits(List<TestSplit> splits) {}
+
+ @Override
+ public void notifyNoMoreSplits() {}
+
+ @Override
+ public void close() {}
+ }
+
+ private static class TestSplitEnumerator implements
SplitEnumerator<TestSplit, Void> {
+ @Override
+ public void start() {}
+
+ @Override
+ public void handleSplitRequest(int subtaskId, String requesterHostname) {}
+
+ @Override
+ public void addSplitsBack(List<TestSplit> splits, int subtaskId) {}
+
+ @Override
+ public void addReader(int subtaskId) {}
+
+ @Override
+ public Void snapshotState(long checkpointId) {
+ return null;
+ }
+
+ @Override
+ public void close() {}
+ }
+
+ private static class TestSplitSerializer implements
SimpleVersionedSerializer<TestSplit> {
+ @Override
+ public int getVersion() {
+ return 1;
+ }
+
+ @Override
+ public byte[] serialize(TestSplit split) {
+ return new byte[0];
+ }
+
+ @Override
+ public TestSplit deserialize(int version, byte[] serialized) {
+ return new TestSplit();
+ }
+ }
+
+ private static class TestCheckpointSerializer implements
SimpleVersionedSerializer<Void> {
+ @Override
+ public int getVersion() {
+ return 1;
+ }
+
+ @Override
+ public byte[] serialize(Void checkpoint) throws IOException {
+ return new byte[0];
+ }
+
+ @Override
+ public Void deserialize(int version, byte[] serialized) throws IOException
{
+ return null;
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]