This is an automated email from the ASF dual-hosted git repository.

lgbo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new e5138c941a [GLUTEN-9553][FLINK] Support kafka connector for data 
source (#11312)
e5138c941a is described below

commit e5138c941ab111bac89e3e4bc0ba8534df2ba8f9
Author: kevinyhzou <[email protected]>
AuthorDate: Thu Dec 18 18:10:37 2025 +0800

    [GLUTEN-9553][FLINK] Support kafka connector for data source (#11312)
    
    * support kafka data source
    
    * fix reviews
    
    * optimize code
---
 .github/workflows/flink.yml                        |   2 +-
 gluten-flink/docs/Flink.md                         |   2 +-
 .../plan/nodes/exec/common/CommonExecSink.java     |   6 +-
 .../exec/stream/StreamExecTableSourceScan.java     |  11 +-
 ...Builder.java => FromElementsSourceFactory.java} |  30 ++--
 .../gluten/velox/KafkaSourceSinkFactory.java       | 141 +++++++++++++++++
 .../apache/gluten/velox/NexmarkSourceFactory.java  | 103 +++++++++++++
 ...VeloxSinkBuilder.java => PrintSinkFactory.java} |  26 +++-
 .../org.apache.gluten.velox.VeloxSourceSinkFactory |   4 +
 .../SourceTransformationTranslator.java            | 168 ---------------------
 .../gluten/velox/VeloxSourceSinkFactory.java       |  84 +++++++++++
 gluten-flink/ut/pom.xml                            |  30 ++++
 .../table/runtime/stream/custom/NexmarkTest.java   | 102 +++++++++++--
 .../ut/src/test/resources/nexmark/ddl_kafka.sql    |  46 ++++++
 .../ut/src/test/resources/nexmark/ddl_views.sql    |   6 +-
 15 files changed, 557 insertions(+), 204 deletions(-)

diff --git a/.github/workflows/flink.yml b/.github/workflows/flink.yml
index 4c0f1fdd4e..099ee73438 100644
--- a/.github/workflows/flink.yml
+++ b/.github/workflows/flink.yml
@@ -61,7 +61,7 @@ jobs:
           sudo yum install 
https://mirror.stream.centos.org/9-stream/BaseOS/x86_64/os/Packages/tzdata-2025a-1.el9.noarch.rpm
 -y
           sudo .github/workflows/util/install-flink-resources.sh
           git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git
-          cd velox4j && git reset --hard 
14eea127c5088f972cdf1ca0987fd95429485a0e
+          cd velox4j && git reset --hard 
1753fa68f71d8a1a0df2d4a0ff346ae00e973e9c
           git apply $GITHUB_WORKSPACE/gluten-flink/patches/fix-velox4j.patch
           mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true
           cd ..
diff --git a/gluten-flink/docs/Flink.md b/gluten-flink/docs/Flink.md
index bc3e2ed092..a73b56bb32 100644
--- a/gluten-flink/docs/Flink.md
+++ b/gluten-flink/docs/Flink.md
@@ -48,7 +48,7 @@ As some features have not been committed to upstream, you 
have to use the follow
 ## fetch velox4j code
 git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git
 cd velox4j
-git reset --hard 14eea127c5088f972cdf1ca0987fd95429485a0e
+git reset --hard 1753fa68f71d8a1a0df2d4a0ff346ae00e973e9c
 mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true
 ```
 **Get gluten**
diff --git 
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
 
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
index f11d3f7958..a1b0e0e7b7 100644
--- 
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
+++ 
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
@@ -19,11 +19,12 @@ package 
org.apache.flink.table.planner.plan.nodes.exec.common;
 import org.apache.gluten.table.runtime.operators.GlutenOneInputOperator;
 import org.apache.gluten.util.LogicalTypeConverter;
 import org.apache.gluten.util.PlanNodeIdGenerator;
-import org.apache.gluten.velox.VeloxSinkBuilder;
+import org.apache.gluten.velox.VeloxSourceSinkFactory;
 
 import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.streaming.api.datastream.CustomSinkOperatorUidHashes;
@@ -470,7 +471,8 @@ public abstract class CommonExecSink extends 
ExecNodeBase<Object>
         Transformation sinkTransformation =
             createSinkFunctionTransformation(
                 sinkFunction, env, inputTransform, rowtimeFieldIndex, 
sinkMeta, sinkParallelism);
-        return VeloxSinkBuilder.build(env.getConfiguration(), 
sinkTransformation);
+        return VeloxSourceSinkFactory.buildSink(
+            sinkTransformation, Map.of(Configuration.class.getName(), 
env.getConfiguration()));
         // --- End Gluten-specific code changes ---
       } else if (runtimeProvider instanceof OutputFormatProvider) {
         OutputFormat<RowData> outputFormat =
diff --git 
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTableSourceScan.java
 
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTableSourceScan.java
index 07189d5f5e..90b3981f0f 100644
--- 
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTableSourceScan.java
+++ 
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTableSourceScan.java
@@ -16,7 +16,7 @@
  */
 package org.apache.flink.table.planner.plan.nodes.exec.stream;
 
-import org.apache.gluten.velox.VeloxSourceBuilder;
+import org.apache.gluten.velox.VeloxSourceSinkFactory;
 
 import org.apache.flink.FlinkVersion;
 import org.apache.flink.api.common.io.InputFormat;
@@ -40,6 +40,7 @@ import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCre
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
 import java.util.Collections;
+import java.util.Map;
 
 /**
  * Stream {@link ExecNode} to read data from an external source defined by a 
{@link
@@ -106,7 +107,13 @@ public class StreamExecTableSourceScan extends 
CommonExecTableSourceScan
             .getScanTableSource(
                 planner.getFlinkContext(), 
ShortcutUtils.unwrapTypeFactory(planner));
     Transformation<RowData> sourceTransformation = 
super.translateToPlanInternal(planner, config);
-    return VeloxSourceBuilder.build(sourceTransformation, tableSource);
+    return VeloxSourceSinkFactory.buildSource(
+        sourceTransformation,
+        Map.of(
+            ScanTableSource.class.getName(),
+            tableSource,
+            "checkpoint.enabled",
+            
planner.getExecEnv().getCheckpointConfig().isCheckpointingEnabled()));
     // --- End Gluten-specific code changes ---
   }
 }
diff --git 
a/gluten-flink/planner/src/main/java/org/apache/gluten/velox/VeloxSourceBuilder.java
 
b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/FromElementsSourceFactory.java
similarity index 81%
rename from 
gluten-flink/planner/src/main/java/org/apache/gluten/velox/VeloxSourceBuilder.java
rename to 
gluten-flink/planner/src/main/java/org/apache/gluten/velox/FromElementsSourceFactory.java
index 7c56bef729..da31edeccd 100644
--- 
a/gluten-flink/planner/src/main/java/org/apache/gluten/velox/VeloxSourceBuilder.java
+++ 
b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/FromElementsSourceFactory.java
@@ -28,6 +28,8 @@ import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode;
 import io.github.zhztheplayer.velox4j.plan.TableScanNode;
 
 import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
+import org.apache.flink.streaming.api.operators.StreamSource;
 import 
org.apache.flink.streaming.api.transformations.LegacySourceTransformation;
 import org.apache.flink.table.connector.source.ScanTableSource;
 import org.apache.flink.table.data.RowData;
@@ -40,25 +42,27 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
-public class VeloxSourceBuilder {
+public class FromElementsSourceFactory implements VeloxSourceSinkFactory {
 
-  public static Transformation<RowData> build(
-      Transformation<RowData> transformation, ScanTableSource scanTableSource) 
{
+  @SuppressWarnings("rawtypes")
+  @Override
+  public boolean match(Transformation<RowData> transformation) {
     if (transformation instanceof LegacySourceTransformation) {
-      if 
(scanTableSource.getClass().getSimpleName().equals("TestValuesScanLookupTableSource"))
 {
-        return buildFromElementsSource(transformation, scanTableSource);
-      }
+      StreamSource source = ((LegacySourceTransformation) 
transformation).getOperator();
+      return source.getUserFunction() instanceof FromElementsFunction;
     }
-    return transformation;
+    return false;
   }
 
-  /** `FromElementsSource` is designed for ut tests, and we map it to velox 
source. */
   @SuppressWarnings({"rawtypes", "unchecked"})
-  private static Transformation<RowData> buildFromElementsSource(
-      Transformation<RowData> transformation, ScanTableSource tableSource) {
+  @Override
+  public Transformation<RowData> buildVeloxSource(
+      Transformation<RowData> transformation, Map<String, Object> parameters) {
     LegacySourceTransformation<RowData> sourceTransformation =
         (LegacySourceTransformation<RowData>) transformation;
     try {
+      ScanTableSource tableSource =
+          (ScanTableSource) parameters.get(ScanTableSource.class.getName());
       Class<?> tableSourceClazz =
           Class.forName(
               
"org.apache.flink.table.planner.factories.TestValuesTableFactory$TestValuesScanTableSourceWithoutProjectionPushDown");
@@ -106,4 +110,10 @@ public class VeloxSourceBuilder {
       throw new FlinkRuntimeException(e);
     }
   }
+
+  @Override
+  public Transformation<RowData> buildVeloxSink(
+      Transformation<RowData> transformation, Map<String, Object> parameters) {
+    throw new FlinkRuntimeException("Unimplemented method 'buildSink'");
+  }
 }
diff --git 
a/gluten-flink/planner/src/main/java/org/apache/gluten/velox/KafkaSourceSinkFactory.java
 
b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/KafkaSourceSinkFactory.java
new file mode 100644
index 0000000000..54e2b8b436
--- /dev/null
+++ 
b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/KafkaSourceSinkFactory.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.velox;
+
+import org.apache.gluten.streaming.api.operators.GlutenStreamSource;
+import org.apache.gluten.table.runtime.operators.GlutenVectorSourceFunction;
+import org.apache.gluten.util.LogicalTypeConverter;
+import org.apache.gluten.util.PlanNodeIdGenerator;
+import org.apache.gluten.util.ReflectUtils;
+
+import io.github.zhztheplayer.velox4j.connector.KafkaConnectorSplit;
+import io.github.zhztheplayer.velox4j.connector.KafkaTableHandle;
+import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode;
+import io.github.zhztheplayer.velox4j.plan.TableScanNode;
+import io.github.zhztheplayer.velox4j.type.RowType;
+
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.dag.Transformation;
+import 
org.apache.flink.streaming.api.transformations.LegacySourceTransformation;
+import org.apache.flink.streaming.api.transformations.SourceTransformation;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+
+public class KafkaSourceSinkFactory implements VeloxSourceSinkFactory {
+
+  @SuppressWarnings("rawtypes")
+  @Override
+  public boolean match(Transformation<RowData> transformation) {
+    if (transformation instanceof SourceTransformation) {
+      Source source = ((SourceTransformation) transformation).getSource();
+      return source.getClass().getSimpleName().equals("KafkaSource");
+    }
+    return false;
+  }
+
+  @SuppressWarnings({"unchecked", "rawtypes"})
+  @Override
+  public Transformation<RowData> buildVeloxSource(
+      Transformation<RowData> transformation, Map<String, Object> parameters) {
+    RowType outputType =
+        (RowType)
+            LogicalTypeConverter.toVLType(
+                ((InternalTypeInfo<?>) 
transformation.getOutputType()).toLogicalType());
+    try {
+      ScanTableSource tableSource =
+          (ScanTableSource) parameters.get(ScanTableSource.class.getName());
+      boolean checkpointEnabled = (Boolean) 
parameters.get("checkpoint.enabled");
+      Class<?> tableSourceClazz =
+          
Class.forName("org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource");
+      Properties properties =
+          (Properties) ReflectUtils.getObjectField(tableSourceClazz, 
tableSource, "properties");
+      List<String> topics =
+          (List<String>) ReflectUtils.getObjectField(tableSourceClazz, 
tableSource, "topics");
+      DecodingFormat decodingFormat =
+          (DecodingFormat)
+              ReflectUtils.getObjectField(tableSourceClazz, tableSource, 
"valueDecodingFormat");
+      String startupMode =
+          String.valueOf(ReflectUtils.getObjectField(tableSourceClazz, 
tableSource, "startupMode"));
+      String connectorId = "connector-kafka";
+      String planId = PlanNodeIdGenerator.newId();
+      String topic = topics.get(0);
+      String format =
+          decodingFormat.getClass().getName().contains("JsonFormatFactory") ? 
"json" : "raw";
+      Map<String, String> kafkaTableParameters = new HashMap<String, String>();
+      for (String key : properties.stringPropertyNames()) {
+        kafkaTableParameters.put(key, properties.getProperty(key));
+      }
+      kafkaTableParameters.put("topic", topic);
+      kafkaTableParameters.put("format", format);
+      kafkaTableParameters.put(
+          "scan.startup.mode",
+          startupMode.equals("LATEST")
+              ? "latest-offsets"
+              : startupMode.equals("EARLIEST") ? "earliest-offsets" : 
"group-offsets");
+      kafkaTableParameters.put("enable.auto.commit", checkpointEnabled ? 
"false" : "true");
+      kafkaTableParameters.put(
+          "client.id",
+          properties.getProperty("client.id.prefix", connectorId) + "-" + 
UUID.randomUUID());
+      KafkaTableHandle kafkaTableHandle =
+          new KafkaTableHandle(connectorId, topic, outputType, 
kafkaTableParameters);
+      KafkaConnectorSplit connectorSplit =
+          new KafkaConnectorSplit(
+              connectorId,
+              0,
+              false,
+              kafkaTableParameters.get("bootstrap.servers"),
+              kafkaTableParameters.get("group.id"),
+              format,
+              
Boolean.valueOf(kafkaTableParameters.getOrDefault("enable.auto.commit", 
"false")),
+              "latest",
+              List.of());
+      TableScanNode kafkaScan = new TableScanNode(planId, outputType, 
kafkaTableHandle, List.of());
+      GlutenStreamSource sourceOp =
+          new GlutenStreamSource(
+              new GlutenVectorSourceFunction(
+                  new StatefulPlanNode(kafkaScan.getId(), kafkaScan),
+                  Map.of(kafkaScan.getId(), outputType),
+                  kafkaScan.getId(),
+                  connectorSplit));
+      SourceTransformation sourceTransformation = (SourceTransformation) 
transformation;
+      return new LegacySourceTransformation<RowData>(
+          sourceTransformation.getName(),
+          sourceOp,
+          transformation.getOutputType(),
+          sourceTransformation.getParallelism(),
+          sourceTransformation.getBoundedness(),
+          false);
+    } catch (Exception e) {
+      throw new FlinkRuntimeException(e);
+    }
+  }
+
+  @Override
+  public Transformation<RowData> buildVeloxSink(
+      Transformation<RowData> transformation, Map<String, Object> parameters) {
+    throw new FlinkRuntimeException("Unimplemented method 'buildSink'");
+  }
+}
diff --git 
a/gluten-flink/planner/src/main/java/org/apache/gluten/velox/NexmarkSourceFactory.java
 
b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/NexmarkSourceFactory.java
new file mode 100644
index 0000000000..736f3cc3c7
--- /dev/null
+++ 
b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/NexmarkSourceFactory.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.velox;
+
+import org.apache.gluten.streaming.api.operators.GlutenStreamSource;
+import org.apache.gluten.table.runtime.operators.GlutenVectorSourceFunction;
+import org.apache.gluten.util.LogicalTypeConverter;
+import org.apache.gluten.util.PlanNodeIdGenerator;
+import org.apache.gluten.util.ReflectUtils;
+
+import io.github.zhztheplayer.velox4j.connector.NexmarkConnectorSplit;
+import io.github.zhztheplayer.velox4j.connector.NexmarkTableHandle;
+import io.github.zhztheplayer.velox4j.plan.PlanNode;
+import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode;
+import io.github.zhztheplayer.velox4j.plan.TableScanNode;
+import io.github.zhztheplayer.velox4j.type.RowType;
+
+import org.apache.flink.api.dag.Transformation;
+import 
org.apache.flink.streaming.api.transformations.LegacySourceTransformation;
+import org.apache.flink.streaming.api.transformations.SourceTransformation;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+
+import java.util.List;
+import java.util.Map;
+
+public class NexmarkSourceFactory implements VeloxSourceSinkFactory {
+
+  @SuppressWarnings("rawtypes")
+  @Override
+  public boolean match(Transformation<RowData> transformation) {
+    if (transformation instanceof SourceTransformation) {
+      Class<?> sourceClazz = ((SourceTransformation) 
transformation).getSource().getClass();
+      return sourceClazz.getSimpleName().equals("NexmarkSource");
+    }
+    return false;
+  }
+
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  @Override
+  public Transformation<RowData> buildVeloxSource(
+      Transformation<RowData> transformation, Map<String, Object> parameters) {
+    RowType outputType =
+        (RowType)
+            LogicalTypeConverter.toVLType(
+                ((InternalTypeInfo) 
transformation.getOutputType()).toLogicalType());
+    Object nexmarkSource = ((SourceTransformation) transformation).getSource();
+    String id = PlanNodeIdGenerator.newId();
+    List<?> nexmarkSourceSplits =
+        (List<?>)
+            ReflectUtils.invokeObjectMethod(
+                nexmarkSource.getClass(),
+                nexmarkSource,
+                "getSplits",
+                new Class<?>[] {int.class},
+                new Object[] {transformation.getParallelism()});
+    Object nexmarkSourceSplit = nexmarkSourceSplits.get(0);
+    Object generatorConfig =
+        ReflectUtils.getObjectField(
+            nexmarkSourceSplit.getClass(), nexmarkSourceSplit, 
"generatorConfig");
+    Long maxEvents =
+        (Long)
+            ReflectUtils.getObjectField(generatorConfig.getClass(), 
generatorConfig, "maxEvents");
+    PlanNode tableScan =
+        new TableScanNode(id, outputType, new 
NexmarkTableHandle("connector-nexmark"), List.of());
+    GlutenStreamSource sourceOp =
+        new GlutenStreamSource(
+            new GlutenVectorSourceFunction(
+                new StatefulPlanNode(tableScan.getId(), tableScan),
+                Map.of(id, outputType),
+                id,
+                new NexmarkConnectorSplit(
+                    "connector-nexmark",
+                    maxEvents > Integer.MAX_VALUE ? Integer.MAX_VALUE : 
maxEvents.intValue())));
+    return new LegacySourceTransformation<RowData>(
+        transformation.getName(),
+        sourceOp,
+        transformation.getOutputType(),
+        transformation.getParallelism(),
+        ((SourceTransformation) transformation).getBoundedness(),
+        false);
+  }
+
+  @Override
+  public Transformation<RowData> buildVeloxSink(
+      Transformation<RowData> transformation, Map<String, Object> parameters) {
+    throw new UnsupportedOperationException("Unimplemented method 
'buildSink'");
+  }
+}
diff --git 
a/gluten-flink/planner/src/main/java/org/apache/gluten/velox/VeloxSinkBuilder.java
 
b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/PrintSinkFactory.java
similarity index 85%
rename from 
gluten-flink/planner/src/main/java/org/apache/gluten/velox/VeloxSinkBuilder.java
rename to 
gluten-flink/planner/src/main/java/org/apache/gluten/velox/PrintSinkFactory.java
index f27dec0a44..b00a76a21f 100644
--- 
a/gluten-flink/planner/src/main/java/org/apache/gluten/velox/VeloxSinkBuilder.java
+++ 
b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/PrintSinkFactory.java
@@ -31,11 +31,12 @@ import io.github.zhztheplayer.velox4j.type.RowType;
 
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
 import org.apache.flink.streaming.api.transformations.LegacySinkTransformation;
+import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.operators.sink.SinkOperator;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.util.FlinkRuntimeException;
@@ -43,9 +44,11 @@ import org.apache.flink.util.FlinkRuntimeException;
 import java.util.List;
 import java.util.Map;
 
-public class VeloxSinkBuilder {
+public class PrintSinkFactory implements VeloxSourceSinkFactory {
 
-  public static Transformation build(ReadableConfig config, Transformation 
transformation) {
+  @SuppressWarnings("rawtypes")
+  @Override
+  public boolean match(Transformation<RowData> transformation) {
     if (transformation instanceof LegacySinkTransformation) {
       SimpleOperatorFactory operatorFactory =
           (SimpleOperatorFactory) ((LegacySinkTransformation) 
transformation).getOperatorFactory();
@@ -56,16 +59,25 @@ public class VeloxSinkBuilder {
               .getClass()
               .getSimpleName()
               .equals("RowDataPrintFunction")) {
-        return buildPrintSink(config, (LegacySinkTransformation) 
transformation);
+        return true;
       }
     }
-    return transformation;
+    return false;
   }
 
-  private static LegacySinkTransformation buildPrintSink(
-      ReadableConfig config, LegacySinkTransformation transformation) {
+  @Override
+  public Transformation<RowData> buildVeloxSource(
+      Transformation<RowData> transformation, Map<String, Object> parameters) {
+    throw new FlinkRuntimeException("Unimplemented method 'buildSource'");
+  }
+
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  @Override
+  public Transformation buildVeloxSink(
+      Transformation<RowData> transformation, Map<String, Object> parameters) {
     Transformation inputTrans = (Transformation) 
transformation.getInputs().get(0);
     InternalTypeInfo inputTypeInfo = (InternalTypeInfo) 
inputTrans.getOutputType();
+    Configuration config = (Configuration) 
parameters.get(Configuration.class.getName());
     String logDir = config.get(CoreOptions.FLINK_LOG_DIR);
     String printPath;
     if (logDir != null) {
diff --git 
a/gluten-flink/planner/src/main/resources/META-INF/services/org.apache.gluten.velox.VeloxSourceSinkFactory
 
b/gluten-flink/planner/src/main/resources/META-INF/services/org.apache.gluten.velox.VeloxSourceSinkFactory
new file mode 100644
index 0000000000..9d7623b7ec
--- /dev/null
+++ 
b/gluten-flink/planner/src/main/resources/META-INF/services/org.apache.gluten.velox.VeloxSourceSinkFactory
@@ -0,0 +1,4 @@
+org.apache.gluten.velox.FromElementsSourceFactory
+org.apache.gluten.velox.KafkaSourceSinkFactory
+org.apache.gluten.velox.PrintSinkFactory
+org.apache.gluten.velox.NexmarkSourceFactory
diff --git 
a/gluten-flink/runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SourceTransformationTranslator.java
 
b/gluten-flink/runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SourceTransformationTranslator.java
deleted file mode 100644
index 5e7dd62f0d..0000000000
--- 
a/gluten-flink/runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SourceTransformationTranslator.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.runtime.translators;
-
-import org.apache.gluten.streaming.api.operators.GlutenStreamSource;
-import org.apache.gluten.table.runtime.operators.GlutenVectorSourceFunction;
-import org.apache.gluten.util.LogicalTypeConverter;
-import org.apache.gluten.util.PlanNodeIdGenerator;
-import org.apache.gluten.util.ReflectUtils;
-
-import io.github.zhztheplayer.velox4j.connector.NexmarkConnectorSplit;
-import io.github.zhztheplayer.velox4j.connector.NexmarkTableHandle;
-import io.github.zhztheplayer.velox4j.plan.PlanNode;
-import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode;
-import io.github.zhztheplayer.velox4j.plan.TableScanNode;
-import io.github.zhztheplayer.velox4j.type.RowType;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.connector.source.SourceSplit;
-import org.apache.flink.streaming.api.graph.SimpleTransformationTranslator;
-import org.apache.flink.streaming.api.graph.StreamGraph;
-import org.apache.flink.streaming.api.graph.TransformationTranslator;
-import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
-import org.apache.flink.streaming.api.operators.SourceOperatorFactory;
-import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
-import org.apache.flink.streaming.api.transformations.SourceTransformation;
-import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A {@link TransformationTranslator} for the {@link SourceTransformation}.
- *
- * @param <OUT> The type of the elements that this source produces.
- */
-@Internal
-public class SourceTransformationTranslator<OUT, SplitT extends SourceSplit, 
EnumChkT>
-    extends SimpleTransformationTranslator<OUT, SourceTransformation<OUT, 
SplitT, EnumChkT>> {
-
-  @Override
-  protected Collection<Integer> translateForBatchInternal(
-      final SourceTransformation<OUT, SplitT, EnumChkT> transformation, final 
Context context) {
-
-    return translateInternal(
-        transformation, context, false /* don't emit progressive watermarks 
*/);
-  }
-
-  @Override
-  protected Collection<Integer> translateForStreamingInternal(
-      final SourceTransformation<OUT, SplitT, EnumChkT> transformation, final 
Context context) {
-
-    return translateInternal(transformation, context, true /* emit progressive 
watermarks */);
-  }
-
-  private Collection<Integer> translateInternal(
-      final SourceTransformation<OUT, SplitT, EnumChkT> transformation,
-      final Context context,
-      boolean emitProgressiveWatermarks) {
-    checkNotNull(transformation);
-    checkNotNull(context);
-
-    final StreamGraph streamGraph = context.getStreamGraph();
-    final String slotSharingGroup = context.getSlotSharingGroup();
-    final int transformationId = transformation.getId();
-    final ExecutionConfig executionConfig = streamGraph.getExecutionConfig();
-
-    // --- Begin Gluten-specific code changes ---
-    Class<?> sourceClazz = transformation.getSource().getClass();
-    if (sourceClazz.getSimpleName().equals("NexmarkSource")) {
-      RowType outputType =
-          (RowType)
-              LogicalTypeConverter.toVLType(
-                  ((InternalTypeInfo) 
transformation.getOutputType()).toLogicalType());
-      String id = PlanNodeIdGenerator.newId();
-      Object nexmarkSource = transformation.getSource();
-      List<?> nexmarkSourceSplits =
-          (List<?>)
-              ReflectUtils.invokeObjectMethod(
-                  sourceClazz,
-                  nexmarkSource,
-                  "getSplits",
-                  new Class<?>[] {int.class},
-                  new Object[] {transformation.getParallelism()});
-      Object nexmarkSourceSplit = nexmarkSourceSplits.get(0);
-      Object generatorConfig =
-          ReflectUtils.getObjectField(
-              nexmarkSourceSplit.getClass(), nexmarkSourceSplit, 
"generatorConfig");
-      Long maxEvents =
-          (Long)
-              ReflectUtils.getObjectField(generatorConfig.getClass(), 
generatorConfig, "maxEvents");
-      PlanNode tableScan =
-          new TableScanNode(id, outputType, new 
NexmarkTableHandle("connector-nexmark"), List.of());
-      StreamOperatorFactory<OUT> operatorFactory =
-          SimpleOperatorFactory.of(
-              new GlutenStreamSource(
-                  new GlutenVectorSourceFunction(
-                      new StatefulPlanNode(tableScan.getId(), tableScan),
-                      Map.of(id, outputType),
-                      id,
-                      new NexmarkConnectorSplit(
-                          "connector-nexmark",
-                          maxEvents > Integer.MAX_VALUE
-                              ? Integer.MAX_VALUE
-                              : maxEvents.intValue()))));
-      streamGraph.addLegacySource(
-          transformationId,
-          slotSharingGroup,
-          transformation.getCoLocationGroupKey(),
-          operatorFactory,
-          null,
-          transformation.getOutputType(),
-          "Source: " + transformation.getName());
-    } else {
-      SourceOperatorFactory<OUT> operatorFactory =
-          new SourceOperatorFactory<>(
-              transformation.getSource(),
-              transformation.getWatermarkStrategy(),
-              emitProgressiveWatermarks);
-
-      
operatorFactory.setChainingStrategy(transformation.getChainingStrategy());
-      
operatorFactory.setCoordinatorListeningID(transformation.getCoordinatorListeningID());
-
-      streamGraph.addSource(
-          transformationId,
-          slotSharingGroup,
-          transformation.getCoLocationGroupKey(),
-          operatorFactory,
-          null,
-          transformation.getOutputType(),
-          "Source: " + transformation.getName());
-    }
-    // --- End Gluten-specific code changes ---
-
-    final int parallelism =
-        transformation.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT
-            ? transformation.getParallelism()
-            : executionConfig.getParallelism();
-
-    streamGraph.setParallelism(
-        transformationId, parallelism, 
transformation.isParallelismConfigured());
-    streamGraph.setMaxParallelism(transformationId, 
transformation.getMaxParallelism());
-
-    streamGraph.setSupportsConcurrentExecutionAttempts(
-        transformationId, 
transformation.isSupportsConcurrentExecutionAttempts());
-
-    return Collections.singleton(transformationId);
-  }
-}
diff --git 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/velox/VeloxSourceSinkFactory.java
 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/velox/VeloxSourceSinkFactory.java
new file mode 100644
index 0000000000..aff1bb7795
--- /dev/null
+++ 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/velox/VeloxSourceSinkFactory.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.velox;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.table.data.RowData;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.ServiceLoader;
+
+public interface VeloxSourceSinkFactory {
+
+  static final Logger LOG = 
LoggerFactory.getLogger(VeloxSourceSinkFactory.class);
+
+  /** Match the conditions to determine if the operator can be offloaded to 
velox. */
+  boolean match(Transformation<RowData> transformation);
+
+  /** Build source transformation that offload the operator to velox. */
+  Transformation<RowData> buildVeloxSource(
+      Transformation<RowData> transformation, Map<String, Object> parameters);
+
+  /** Build sink transformation that offload the operator to velox. */
+  Transformation<RowData> buildVeloxSink(
+      Transformation<RowData> transformation, Map<String, Object> parameters);
+
+  /** Choose the matched source/sink factory by given transformation. */
+  private static Optional<VeloxSourceSinkFactory> getFactory(
+      Transformation<RowData> transformation) {
+    ServiceLoader<VeloxSourceSinkFactory> factories =
+        ServiceLoader.load(VeloxSourceSinkFactory.class);
+    for (VeloxSourceSinkFactory factory : factories) {
+      if (factory.match(transformation)) {
+        return Optional.of(factory);
+      }
+    }
+    return Optional.empty();
+  }
+
+  /** Build Velox source, or fallback to flink orignal source . */
+  static Transformation<RowData> buildSource(
+      Transformation<RowData> transformation, Map<String, Object> parameters) {
+    Optional<VeloxSourceSinkFactory> factory = getFactory(transformation);
+    if (factory.isEmpty()) {
+      LOG.warn(
+          "Not find matched factory to build velox source transformation, and 
we will use flink original transformation {} instead.",
+          transformation.getClass().getName());
+      return transformation;
+    } else {
+      return factory.get().buildVeloxSource(transformation, parameters);
+    }
+  }
+
+  /** Build Velox sink, or fallback to flink original sink. */
+  static Transformation<RowData> buildSink(
+      Transformation<RowData> transformation, Map<String, Object> parameters) {
+    Optional<VeloxSourceSinkFactory> factory = getFactory(transformation);
+    if (factory.isEmpty()) {
+      LOG.warn(
+          "Not find matched factory to build velox sink transformation, and we 
will use flink original transformation {} instead.",
+          transformation.getClass().getName());
+      return transformation;
+    } else {
+      return factory.get().buildVeloxSink(transformation, parameters);
+    }
+  }
+}
diff --git a/gluten-flink/ut/pom.xml b/gluten-flink/ut/pom.xml
index 3583f7823f..74311055a6 100644
--- a/gluten-flink/ut/pom.xml
+++ b/gluten-flink/ut/pom.xml
@@ -152,6 +152,36 @@
       <classifier>tests</classifier>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>com.salesforce.kafka.test</groupId>
+      <artifactId>kafka-junit5</artifactId>
+      <version>3.2.5</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka_2.12</artifactId>
+      <version>3.4.0</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-connector-kafka</artifactId>
+      <version>3.3.0-1.19</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-connector-base</artifactId>
+      <version>${flink.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-json</artifactId>
+      <version>${flink.version}</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
diff --git 
a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java
 
b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java
index 5454b29396..476c4cba4d 100644
--- 
a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java
+++ 
b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java
@@ -18,13 +18,18 @@ package org.apache.gluten.table.runtime.stream.custom;
 
 import org.apache.gluten.table.runtime.stream.common.Velox4jEnvironment;
 
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 
+import com.salesforce.kafka.test.junit5.SharedKafkaTestResource;
+import com.salesforce.kafka.test.listeners.PlainListener;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -61,9 +66,30 @@ public class NexmarkTest {
           put("PERSON_PROPORTION", "1");
           put("AUCTION_PROPORTION", "3");
           put("BID_PROPORTION", "46");
+          put("NEXMARK_TABLE", "datagen");
         }
       };
 
+  private static final int KAFKA_PORT = 9092;
+  private static String topicName = "nexmark";
+
+  @RegisterExtension
+  public static final SharedKafkaTestResource kafkaInstance =
+      new SharedKafkaTestResource()
+          .withBrokers(1)
+          .registerListener(new PlainListener().onPorts(KAFKA_PORT));
+
+  private static final Map<String, String> KAFKA_VARIABLES =
+      new HashMap<>() {
+        {
+          put("BOOTSTRAP_SERVERS", "localhost:9092");
+          put("NEXMARK_TABLE", "kafka");
+        }
+      };
+
+  private static final List<String> VIEWS = List.of("person", "auction", 
"bid", "B");
+  private static final List<String> FUNCTIONS = List.of("count_char");
+
   private static StreamTableEnvironment tEnv;
 
   @BeforeAll
@@ -76,31 +102,49 @@ public class NexmarkTest {
 
     EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inStreamingMode().build();
     tEnv = StreamTableEnvironment.create(env, settings);
-
-    setupNexmarkEnvironment(tEnv);
   }
 
   @Test
-  void testAllNexmarkQueries() throws ExecutionException, 
InterruptedException, TimeoutException {
+  void testAllNexmarkSourceQueries()
+      throws ExecutionException, InterruptedException, TimeoutException {
+    setupNexmarkEnvironment(tEnv, "ddl_gen.sql", NEXMARK_VARIABLES);
     List<String> queryFiles = getQueries();
     assertThat(queryFiles).isNotEmpty();
+    LOG.warn("Found {} Nexmark query files: {}", queryFiles.size(), 
queryFiles);
 
+    for (String queryFile : queryFiles) {
+      LOG.warn("Executing nextmark query from file: {}", queryFile);
+      executeQuery(tEnv, queryFile, false);
+    }
+    clearEnvironment(tEnv);
+  }
+
+  @Test
+  void testAllKafkaSourceQueries()
+      throws ExecutionException, InterruptedException, TimeoutException {
+    kafkaInstance.getKafkaTestUtils().createTopic(topicName, 1, (short) 1);
+    setupNexmarkEnvironment(tEnv, "ddl_kafka.sql", KAFKA_VARIABLES);
+    List<String> queryFiles = getQueries();
+    assertThat(queryFiles).isNotEmpty();
     LOG.warn("Found {} Nexmark query files: {}", queryFiles.size(), 
queryFiles);
 
     for (String queryFile : queryFiles) {
-      LOG.warn("Executing query from file: {}", queryFile);
-      executeQuery(tEnv, queryFile);
+      LOG.warn("Executing kafka query from file:{}", queryFile);
+      executeQuery(tEnv, queryFile, true);
     }
+    clearEnvironment(tEnv);
   }
 
-  private static void setupNexmarkEnvironment(StreamTableEnvironment tEnv) {
-    String createNexmarkSource = readSqlFromFile(NEXMARK_RESOURCE_DIR + 
"/ddl_gen.sql");
-    createNexmarkSource = replaceVariables(createNexmarkSource, 
NEXMARK_VARIABLES);
+  private static void setupNexmarkEnvironment(
+      StreamTableEnvironment tEnv, String sourceFileName, Map<String, String> 
variables) {
+    String createNexmarkSource = readSqlFromFile(NEXMARK_RESOURCE_DIR + "/" + 
sourceFileName);
+    createNexmarkSource = replaceVariables(createNexmarkSource, variables);
     tEnv.executeSql(createNexmarkSource);
 
     String createTableView = readSqlFromFile(NEXMARK_RESOURCE_DIR + 
"/ddl_views.sql");
     String[] sqlTableView = createTableView.split(";");
     for (String sql : sqlTableView) {
+      sql = replaceVariables(sql, variables);
       String trimmedSql = sql.trim();
       if (!trimmedSql.isEmpty()) {
         tEnv.executeSql(trimmedSql);
@@ -116,7 +160,23 @@ public class NexmarkTest {
     return result;
   }
 
-  private void executeQuery(StreamTableEnvironment tEnv, String queryFileName)
+  private static void clearEnvironment(StreamTableEnvironment tEnv) {
+    for (int i = 0; i <= 22; ++i) {
+      String tableName = "nexmark_q" + i;
+      String sql = String.format("drop table if exists %s", tableName);
+      tEnv.executeSql(sql);
+    }
+    for (String view : VIEWS) {
+      String sql = String.format("drop view if exists %s", view);
+      tEnv.executeSql(sql);
+    }
+    for (String func : FUNCTIONS) {
+      String sql = String.format("drop function if exists %s", func);
+      tEnv.executeSql(sql);
+    }
+  }
+
+  private void executeQuery(StreamTableEnvironment tEnv, String queryFileName, 
boolean kafkaSource)
       throws ExecutionException, InterruptedException, TimeoutException {
     String queryContent = readSqlFromFile(NEXMARK_RESOURCE_DIR + "/" + 
queryFileName);
 
@@ -136,7 +196,11 @@ public class NexmarkTest {
     String insertQuery = sqlStatements[sqlStatements.length - 2].trim();
     if (!insertQuery.isEmpty()) {
       TableResult insertResult = tEnv.executeSql(insertQuery);
-      waitForJobCompletion(insertResult, 30000);
+      if (kafkaSource) {
+        assertThat(checkJobRunningStatus(insertResult, 30000) == true);
+      } else {
+        waitForJobCompletion(insertResult, 30000);
+      }
     }
     assertTrue(sqlStatements[sqlStatements.length - 1].trim().isEmpty());
   }
@@ -147,6 +211,24 @@ public class NexmarkTest {
     result.getJobClient().get().getJobExecutionResult().get(timeoutMs, 
TimeUnit.MILLISECONDS);
   }
 
+  private boolean checkJobRunningStatus(TableResult result, long timeoutMs)
+      throws InterruptedException {
+    long startTime = System.currentTimeMillis();
+    assertTrue(result.getJobClient().isPresent());
+    JobClient jobClient = result.getJobClient().get();
+    while (System.currentTimeMillis() < startTime + timeoutMs) {
+      if (jobClient.getJobStatus().complete(JobStatus.RUNNING)) {
+        jobClient.cancel();
+        return true;
+      } else {
+        Thread.sleep(1000);
+      }
+    }
+    LOG.warn("Job not running in " + timeoutMs + " millseconds.");
+    jobClient.cancel();
+    return false;
+  }
+
   private List<String> getQueries() {
     URL resourceUrl = 
getClass().getClassLoader().getResource(NEXMARK_RESOURCE_DIR);
 
diff --git a/gluten-flink/ut/src/test/resources/nexmark/ddl_kafka.sql 
b/gluten-flink/ut/src/test/resources/nexmark/ddl_kafka.sql
new file mode 100644
index 0000000000..27757eaeaf
--- /dev/null
+++ b/gluten-flink/ut/src/test/resources/nexmark/ddl_kafka.sql
@@ -0,0 +1,46 @@
+CREATE TABLE kafka (
+    event_type int,
+    person ROW<
+        id  BIGINT,
+        name  VARCHAR,
+        emailAddress  VARCHAR,
+        creditCard  VARCHAR,
+        city  VARCHAR,
+        state  VARCHAR,
+        `dateTime` TIMESTAMP(3),
+        extra  VARCHAR>,
+    auction ROW<
+        id  BIGINT,
+        itemName  VARCHAR,
+        description  VARCHAR,
+        initialBid  BIGINT,
+        reserve  BIGINT,
+        `dateTime`  TIMESTAMP(3),
+        expires  TIMESTAMP(3),
+        seller  BIGINT,
+        category  BIGINT,
+        extra  VARCHAR>,
+    bid ROW<
+        auction  BIGINT,
+        bidder  BIGINT,
+        price  BIGINT,
+        channel  VARCHAR,
+        url  VARCHAR,
+        `dateTime`  TIMESTAMP(3),
+        extra  VARCHAR>,
+    `dateTime` AS
+        CASE
+            WHEN event_type = 0 THEN person.`dateTime`
+            WHEN event_type = 1 THEN auction.`dateTime`
+            ELSE bid.`dateTime`
+        END,
+    WATERMARK FOR `dateTime` AS `dateTime` - INTERVAL '4' SECOND
+) WITH (
+    'connector' = 'kafka',
+    'topic' = 'nexmark',
+    'properties.bootstrap.servers' = '${BOOTSTRAP_SERVERS}',
+    'properties.group.id' = 'nexmark',
+    'scan.startup.mode' = 'earliest-offset',
+    'sink.partitioner' = 'round-robin',
+    'format' = 'json'
+);
diff --git a/gluten-flink/ut/src/test/resources/nexmark/ddl_views.sql 
b/gluten-flink/ut/src/test/resources/nexmark/ddl_views.sql
index 54902b4b44..36f368dd92 100644
--- a/gluten-flink/ut/src/test/resources/nexmark/ddl_views.sql
+++ b/gluten-flink/ut/src/test/resources/nexmark/ddl_views.sql
@@ -8,7 +8,7 @@ SELECT
     person.state,
     `dateTime`,
     person.extra
-FROM datagen WHERE event_type = 0;
+FROM ${NEXMARK_TABLE} WHERE event_type = 0;
 
 CREATE VIEW auction AS
 SELECT
@@ -22,7 +22,7 @@ SELECT
     auction.seller,
     auction.category,
     auction.extra
-FROM datagen WHERE event_type = 1;
+FROM ${NEXMARK_TABLE} WHERE event_type = 1;
 
 CREATE VIEW bid AS
 SELECT
@@ -33,4 +33,4 @@ SELECT
     bid.url,
     `dateTime`,
     bid.extra
-FROM datagen WHERE event_type = 2;
+FROM ${NEXMARK_TABLE} WHERE event_type = 2;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to