This is an automated email from the ASF dual-hosted git repository.
lgbo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new e5138c941a [GLUTEN-9553][FLINK] Support kafka connector for data
source (#11312)
e5138c941a is described below
commit e5138c941ab111bac89e3e4bc0ba8534df2ba8f9
Author: kevinyhzou <[email protected]>
AuthorDate: Thu Dec 18 18:10:37 2025 +0800
[GLUTEN-9553][FLINK] Support kafka connector for data source (#11312)
* support kafka data source
* fix reviews
* optimize code
---
.github/workflows/flink.yml | 2 +-
gluten-flink/docs/Flink.md | 2 +-
.../plan/nodes/exec/common/CommonExecSink.java | 6 +-
.../exec/stream/StreamExecTableSourceScan.java | 11 +-
...Builder.java => FromElementsSourceFactory.java} | 30 ++--
.../gluten/velox/KafkaSourceSinkFactory.java | 141 +++++++++++++++++
.../apache/gluten/velox/NexmarkSourceFactory.java | 103 +++++++++++++
...VeloxSinkBuilder.java => PrintSinkFactory.java} | 26 +++-
.../org.apache.gluten.velox.VeloxSourceSinkFactory | 4 +
.../SourceTransformationTranslator.java | 168 ---------------------
.../gluten/velox/VeloxSourceSinkFactory.java | 84 +++++++++++
gluten-flink/ut/pom.xml | 30 ++++
.../table/runtime/stream/custom/NexmarkTest.java | 102 +++++++++++--
.../ut/src/test/resources/nexmark/ddl_kafka.sql | 46 ++++++
.../ut/src/test/resources/nexmark/ddl_views.sql | 6 +-
15 files changed, 557 insertions(+), 204 deletions(-)
diff --git a/.github/workflows/flink.yml b/.github/workflows/flink.yml
index 4c0f1fdd4e..099ee73438 100644
--- a/.github/workflows/flink.yml
+++ b/.github/workflows/flink.yml
@@ -61,7 +61,7 @@ jobs:
sudo yum install
https://mirror.stream.centos.org/9-stream/BaseOS/x86_64/os/Packages/tzdata-2025a-1.el9.noarch.rpm
-y
sudo .github/workflows/util/install-flink-resources.sh
git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git
- cd velox4j && git reset --hard
14eea127c5088f972cdf1ca0987fd95429485a0e
+ cd velox4j && git reset --hard
1753fa68f71d8a1a0df2d4a0ff346ae00e973e9c
git apply $GITHUB_WORKSPACE/gluten-flink/patches/fix-velox4j.patch
mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true
cd ..
diff --git a/gluten-flink/docs/Flink.md b/gluten-flink/docs/Flink.md
index bc3e2ed092..a73b56bb32 100644
--- a/gluten-flink/docs/Flink.md
+++ b/gluten-flink/docs/Flink.md
@@ -48,7 +48,7 @@ As some features have not been committed to upstream, you
have to use the follow
## fetch velox4j code
git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git
cd velox4j
-git reset --hard 14eea127c5088f972cdf1ca0987fd95429485a0e
+git reset --hard 1753fa68f71d8a1a0df2d4a0ff346ae00e973e9c
mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true
```
**Get gluten**
diff --git
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
index f11d3f7958..a1b0e0e7b7 100644
---
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
+++
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
@@ -19,11 +19,12 @@ package
org.apache.flink.table.planner.plan.nodes.exec.common;
import org.apache.gluten.table.runtime.operators.GlutenOneInputOperator;
import org.apache.gluten.util.LogicalTypeConverter;
import org.apache.gluten.util.PlanNodeIdGenerator;
-import org.apache.gluten.velox.VeloxSinkBuilder;
+import org.apache.gluten.velox.VeloxSourceSinkFactory;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.streaming.api.datastream.CustomSinkOperatorUidHashes;
@@ -470,7 +471,8 @@ public abstract class CommonExecSink extends
ExecNodeBase<Object>
Transformation sinkTransformation =
createSinkFunctionTransformation(
sinkFunction, env, inputTransform, rowtimeFieldIndex,
sinkMeta, sinkParallelism);
- return VeloxSinkBuilder.build(env.getConfiguration(),
sinkTransformation);
+ return VeloxSourceSinkFactory.buildSink(
+ sinkTransformation, Map.of(Configuration.class.getName(),
env.getConfiguration()));
// --- End Gluten-specific code changes ---
} else if (runtimeProvider instanceof OutputFormatProvider) {
OutputFormat<RowData> outputFormat =
diff --git
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTableSourceScan.java
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTableSourceScan.java
index 07189d5f5e..90b3981f0f 100644
---
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTableSourceScan.java
+++
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTableSourceScan.java
@@ -16,7 +16,7 @@
*/
package org.apache.flink.table.planner.plan.nodes.exec.stream;
-import org.apache.gluten.velox.VeloxSourceBuilder;
+import org.apache.gluten.velox.VeloxSourceSinkFactory;
import org.apache.flink.FlinkVersion;
import org.apache.flink.api.common.io.InputFormat;
@@ -40,6 +40,7 @@ import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCre
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Collections;
+import java.util.Map;
/**
* Stream {@link ExecNode} to read data from an external source defined by a
{@link
@@ -106,7 +107,13 @@ public class StreamExecTableSourceScan extends
CommonExecTableSourceScan
.getScanTableSource(
planner.getFlinkContext(),
ShortcutUtils.unwrapTypeFactory(planner));
Transformation<RowData> sourceTransformation =
super.translateToPlanInternal(planner, config);
- return VeloxSourceBuilder.build(sourceTransformation, tableSource);
+ return VeloxSourceSinkFactory.buildSource(
+ sourceTransformation,
+ Map.of(
+ ScanTableSource.class.getName(),
+ tableSource,
+ "checkpoint.enabled",
+
planner.getExecEnv().getCheckpointConfig().isCheckpointingEnabled()));
// --- End Gluten-specific code changes ---
}
}
diff --git
a/gluten-flink/planner/src/main/java/org/apache/gluten/velox/VeloxSourceBuilder.java
b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/FromElementsSourceFactory.java
similarity index 81%
rename from
gluten-flink/planner/src/main/java/org/apache/gluten/velox/VeloxSourceBuilder.java
rename to
gluten-flink/planner/src/main/java/org/apache/gluten/velox/FromElementsSourceFactory.java
index 7c56bef729..da31edeccd 100644
---
a/gluten-flink/planner/src/main/java/org/apache/gluten/velox/VeloxSourceBuilder.java
+++
b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/FromElementsSourceFactory.java
@@ -28,6 +28,8 @@ import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode;
import io.github.zhztheplayer.velox4j.plan.TableScanNode;
import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
+import org.apache.flink.streaming.api.operators.StreamSource;
import
org.apache.flink.streaming.api.transformations.LegacySourceTransformation;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.data.RowData;
@@ -40,25 +42,27 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
-public class VeloxSourceBuilder {
+public class FromElementsSourceFactory implements VeloxSourceSinkFactory {
- public static Transformation<RowData> build(
- Transformation<RowData> transformation, ScanTableSource scanTableSource)
{
+ @SuppressWarnings("rawtypes")
+ @Override
+ public boolean match(Transformation<RowData> transformation) {
if (transformation instanceof LegacySourceTransformation) {
- if
(scanTableSource.getClass().getSimpleName().equals("TestValuesScanLookupTableSource"))
{
- return buildFromElementsSource(transformation, scanTableSource);
- }
+ StreamSource source = ((LegacySourceTransformation)
transformation).getOperator();
+ return source.getUserFunction() instanceof FromElementsFunction;
}
- return transformation;
+ return false;
}
- /** `FromElementsSource` is designed for ut tests, and we map it to velox
source. */
@SuppressWarnings({"rawtypes", "unchecked"})
- private static Transformation<RowData> buildFromElementsSource(
- Transformation<RowData> transformation, ScanTableSource tableSource) {
+ @Override
+ public Transformation<RowData> buildVeloxSource(
+ Transformation<RowData> transformation, Map<String, Object> parameters) {
LegacySourceTransformation<RowData> sourceTransformation =
(LegacySourceTransformation<RowData>) transformation;
try {
+ ScanTableSource tableSource =
+ (ScanTableSource) parameters.get(ScanTableSource.class.getName());
Class<?> tableSourceClazz =
Class.forName(
"org.apache.flink.table.planner.factories.TestValuesTableFactory$TestValuesScanTableSourceWithoutProjectionPushDown");
@@ -106,4 +110,10 @@ public class VeloxSourceBuilder {
throw new FlinkRuntimeException(e);
}
}
+
+ @Override
+ public Transformation<RowData> buildVeloxSink(
+ Transformation<RowData> transformation, Map<String, Object> parameters) {
+ throw new FlinkRuntimeException("Unimplemented method 'buildSink'");
+ }
}
diff --git
a/gluten-flink/planner/src/main/java/org/apache/gluten/velox/KafkaSourceSinkFactory.java
b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/KafkaSourceSinkFactory.java
new file mode 100644
index 0000000000..54e2b8b436
--- /dev/null
+++
b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/KafkaSourceSinkFactory.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.velox;
+
+import org.apache.gluten.streaming.api.operators.GlutenStreamSource;
+import org.apache.gluten.table.runtime.operators.GlutenVectorSourceFunction;
+import org.apache.gluten.util.LogicalTypeConverter;
+import org.apache.gluten.util.PlanNodeIdGenerator;
+import org.apache.gluten.util.ReflectUtils;
+
+import io.github.zhztheplayer.velox4j.connector.KafkaConnectorSplit;
+import io.github.zhztheplayer.velox4j.connector.KafkaTableHandle;
+import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode;
+import io.github.zhztheplayer.velox4j.plan.TableScanNode;
+import io.github.zhztheplayer.velox4j.type.RowType;
+
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.dag.Transformation;
+import
org.apache.flink.streaming.api.transformations.LegacySourceTransformation;
+import org.apache.flink.streaming.api.transformations.SourceTransformation;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+
+public class KafkaSourceSinkFactory implements VeloxSourceSinkFactory {
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public boolean match(Transformation<RowData> transformation) {
+ if (transformation instanceof SourceTransformation) {
+ Source source = ((SourceTransformation) transformation).getSource();
+ return source.getClass().getSimpleName().equals("KafkaSource");
+ }
+ return false;
+ }
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ @Override
+ public Transformation<RowData> buildVeloxSource(
+ Transformation<RowData> transformation, Map<String, Object> parameters) {
+ RowType outputType =
+ (RowType)
+ LogicalTypeConverter.toVLType(
+ ((InternalTypeInfo<?>)
transformation.getOutputType()).toLogicalType());
+ try {
+ ScanTableSource tableSource =
+ (ScanTableSource) parameters.get(ScanTableSource.class.getName());
+ boolean checkpointEnabled = (Boolean)
parameters.get("checkpoint.enabled");
+ Class<?> tableSourceClazz =
+
Class.forName("org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource");
+ Properties properties =
+ (Properties) ReflectUtils.getObjectField(tableSourceClazz,
tableSource, "properties");
+ List<String> topics =
+ (List<String>) ReflectUtils.getObjectField(tableSourceClazz,
tableSource, "topics");
+ DecodingFormat decodingFormat =
+ (DecodingFormat)
+ ReflectUtils.getObjectField(tableSourceClazz, tableSource,
"valueDecodingFormat");
+ String startupMode =
+ String.valueOf(ReflectUtils.getObjectField(tableSourceClazz,
tableSource, "startupMode"));
+ String connectorId = "connector-kafka";
+ String planId = PlanNodeIdGenerator.newId();
+ String topic = topics.get(0);
+ String format =
+ decodingFormat.getClass().getName().contains("JsonFormatFactory") ?
"json" : "raw";
+ Map<String, String> kafkaTableParameters = new HashMap<String, String>();
+ for (String key : properties.stringPropertyNames()) {
+ kafkaTableParameters.put(key, properties.getProperty(key));
+ }
+ kafkaTableParameters.put("topic", topic);
+ kafkaTableParameters.put("format", format);
+ kafkaTableParameters.put(
+ "scan.startup.mode",
+ startupMode.equals("LATEST")
+ ? "latest-offsets"
+ : startupMode.equals("EARLIEST") ? "earliest-offsets" :
"group-offsets");
+ kafkaTableParameters.put("enable.auto.commit", checkpointEnabled ?
"false" : "true");
+ kafkaTableParameters.put(
+ "client.id",
+ properties.getProperty("client.id.prefix", connectorId) + "-" +
UUID.randomUUID());
+ KafkaTableHandle kafkaTableHandle =
+ new KafkaTableHandle(connectorId, topic, outputType,
kafkaTableParameters);
+ KafkaConnectorSplit connectorSplit =
+ new KafkaConnectorSplit(
+ connectorId,
+ 0,
+ false,
+ kafkaTableParameters.get("bootstrap.servers"),
+ kafkaTableParameters.get("group.id"),
+ format,
+
Boolean.valueOf(kafkaTableParameters.getOrDefault("enable.auto.commit",
"false")),
+ "latest",
+ List.of());
+ TableScanNode kafkaScan = new TableScanNode(planId, outputType,
kafkaTableHandle, List.of());
+ GlutenStreamSource sourceOp =
+ new GlutenStreamSource(
+ new GlutenVectorSourceFunction(
+ new StatefulPlanNode(kafkaScan.getId(), kafkaScan),
+ Map.of(kafkaScan.getId(), outputType),
+ kafkaScan.getId(),
+ connectorSplit));
+ SourceTransformation sourceTransformation = (SourceTransformation)
transformation;
+ return new LegacySourceTransformation<RowData>(
+ sourceTransformation.getName(),
+ sourceOp,
+ transformation.getOutputType(),
+ sourceTransformation.getParallelism(),
+ sourceTransformation.getBoundedness(),
+ false);
+ } catch (Exception e) {
+ throw new FlinkRuntimeException(e);
+ }
+ }
+
+ @Override
+ public Transformation<RowData> buildVeloxSink(
+ Transformation<RowData> transformation, Map<String, Object> parameters) {
+ throw new FlinkRuntimeException("Unimplemented method 'buildSink'");
+ }
+}
diff --git
a/gluten-flink/planner/src/main/java/org/apache/gluten/velox/NexmarkSourceFactory.java
b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/NexmarkSourceFactory.java
new file mode 100644
index 0000000000..736f3cc3c7
--- /dev/null
+++
b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/NexmarkSourceFactory.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.velox;
+
+import org.apache.gluten.streaming.api.operators.GlutenStreamSource;
+import org.apache.gluten.table.runtime.operators.GlutenVectorSourceFunction;
+import org.apache.gluten.util.LogicalTypeConverter;
+import org.apache.gluten.util.PlanNodeIdGenerator;
+import org.apache.gluten.util.ReflectUtils;
+
+import io.github.zhztheplayer.velox4j.connector.NexmarkConnectorSplit;
+import io.github.zhztheplayer.velox4j.connector.NexmarkTableHandle;
+import io.github.zhztheplayer.velox4j.plan.PlanNode;
+import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode;
+import io.github.zhztheplayer.velox4j.plan.TableScanNode;
+import io.github.zhztheplayer.velox4j.type.RowType;
+
+import org.apache.flink.api.dag.Transformation;
+import
org.apache.flink.streaming.api.transformations.LegacySourceTransformation;
+import org.apache.flink.streaming.api.transformations.SourceTransformation;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+
+import java.util.List;
+import java.util.Map;
+
+public class NexmarkSourceFactory implements VeloxSourceSinkFactory {
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public boolean match(Transformation<RowData> transformation) {
+ if (transformation instanceof SourceTransformation) {
+ Class<?> sourceClazz = ((SourceTransformation)
transformation).getSource().getClass();
+ return sourceClazz.getSimpleName().equals("NexmarkSource");
+ }
+ return false;
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ @Override
+ public Transformation<RowData> buildVeloxSource(
+ Transformation<RowData> transformation, Map<String, Object> parameters) {
+ RowType outputType =
+ (RowType)
+ LogicalTypeConverter.toVLType(
+ ((InternalTypeInfo)
transformation.getOutputType()).toLogicalType());
+ Object nexmarkSource = ((SourceTransformation) transformation).getSource();
+ String id = PlanNodeIdGenerator.newId();
+ List<?> nexmarkSourceSplits =
+ (List<?>)
+ ReflectUtils.invokeObjectMethod(
+ nexmarkSource.getClass(),
+ nexmarkSource,
+ "getSplits",
+ new Class<?>[] {int.class},
+ new Object[] {transformation.getParallelism()});
+ Object nexmarkSourceSplit = nexmarkSourceSplits.get(0);
+ Object generatorConfig =
+ ReflectUtils.getObjectField(
+ nexmarkSourceSplit.getClass(), nexmarkSourceSplit,
"generatorConfig");
+ Long maxEvents =
+ (Long)
+ ReflectUtils.getObjectField(generatorConfig.getClass(),
generatorConfig, "maxEvents");
+ PlanNode tableScan =
+ new TableScanNode(id, outputType, new
NexmarkTableHandle("connector-nexmark"), List.of());
+ GlutenStreamSource sourceOp =
+ new GlutenStreamSource(
+ new GlutenVectorSourceFunction(
+ new StatefulPlanNode(tableScan.getId(), tableScan),
+ Map.of(id, outputType),
+ id,
+ new NexmarkConnectorSplit(
+ "connector-nexmark",
+ maxEvents > Integer.MAX_VALUE ? Integer.MAX_VALUE :
maxEvents.intValue())));
+ return new LegacySourceTransformation<RowData>(
+ transformation.getName(),
+ sourceOp,
+ transformation.getOutputType(),
+ transformation.getParallelism(),
+ ((SourceTransformation) transformation).getBoundedness(),
+ false);
+ }
+
+ @Override
+ public Transformation<RowData> buildVeloxSink(
+ Transformation<RowData> transformation, Map<String, Object> parameters) {
+ throw new UnsupportedOperationException("Unimplemented method
'buildSink'");
+ }
+}
diff --git
a/gluten-flink/planner/src/main/java/org/apache/gluten/velox/VeloxSinkBuilder.java
b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/PrintSinkFactory.java
similarity index 85%
rename from
gluten-flink/planner/src/main/java/org/apache/gluten/velox/VeloxSinkBuilder.java
rename to
gluten-flink/planner/src/main/java/org/apache/gluten/velox/PrintSinkFactory.java
index f27dec0a44..b00a76a21f 100644
---
a/gluten-flink/planner/src/main/java/org/apache/gluten/velox/VeloxSinkBuilder.java
+++
b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/PrintSinkFactory.java
@@ -31,11 +31,12 @@ import io.github.zhztheplayer.velox4j.type.RowType;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.transformations.LegacySinkTransformation;
+import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.operators.sink.SinkOperator;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.util.FlinkRuntimeException;
@@ -43,9 +44,11 @@ import org.apache.flink.util.FlinkRuntimeException;
import java.util.List;
import java.util.Map;
-public class VeloxSinkBuilder {
+public class PrintSinkFactory implements VeloxSourceSinkFactory {
- public static Transformation build(ReadableConfig config, Transformation
transformation) {
+ @SuppressWarnings("rawtypes")
+ @Override
+ public boolean match(Transformation<RowData> transformation) {
if (transformation instanceof LegacySinkTransformation) {
SimpleOperatorFactory operatorFactory =
(SimpleOperatorFactory) ((LegacySinkTransformation)
transformation).getOperatorFactory();
@@ -56,16 +59,25 @@ public class VeloxSinkBuilder {
.getClass()
.getSimpleName()
.equals("RowDataPrintFunction")) {
- return buildPrintSink(config, (LegacySinkTransformation)
transformation);
+ return true;
}
}
- return transformation;
+ return false;
}
- private static LegacySinkTransformation buildPrintSink(
- ReadableConfig config, LegacySinkTransformation transformation) {
+ @Override
+ public Transformation<RowData> buildVeloxSource(
+ Transformation<RowData> transformation, Map<String, Object> parameters) {
+ throw new FlinkRuntimeException("Unimplemented method 'buildSource'");
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ @Override
+ public Transformation buildVeloxSink(
+ Transformation<RowData> transformation, Map<String, Object> parameters) {
Transformation inputTrans = (Transformation)
transformation.getInputs().get(0);
InternalTypeInfo inputTypeInfo = (InternalTypeInfo)
inputTrans.getOutputType();
+ Configuration config = (Configuration)
parameters.get(Configuration.class.getName());
String logDir = config.get(CoreOptions.FLINK_LOG_DIR);
String printPath;
if (logDir != null) {
diff --git
a/gluten-flink/planner/src/main/resources/META-INF/services/org.apache.gluten.velox.VeloxSourceSinkFactory
b/gluten-flink/planner/src/main/resources/META-INF/services/org.apache.gluten.velox.VeloxSourceSinkFactory
new file mode 100644
index 0000000000..9d7623b7ec
--- /dev/null
+++
b/gluten-flink/planner/src/main/resources/META-INF/services/org.apache.gluten.velox.VeloxSourceSinkFactory
@@ -0,0 +1,4 @@
+org.apache.gluten.velox.FromElementsSourceFactory
+org.apache.gluten.velox.KafkaSourceSinkFactory
+org.apache.gluten.velox.PrintSinkFactory
+org.apache.gluten.velox.NexmarkSourceFactory
diff --git
a/gluten-flink/runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SourceTransformationTranslator.java
b/gluten-flink/runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SourceTransformationTranslator.java
deleted file mode 100644
index 5e7dd62f0d..0000000000
---
a/gluten-flink/runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SourceTransformationTranslator.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.runtime.translators;
-
-import org.apache.gluten.streaming.api.operators.GlutenStreamSource;
-import org.apache.gluten.table.runtime.operators.GlutenVectorSourceFunction;
-import org.apache.gluten.util.LogicalTypeConverter;
-import org.apache.gluten.util.PlanNodeIdGenerator;
-import org.apache.gluten.util.ReflectUtils;
-
-import io.github.zhztheplayer.velox4j.connector.NexmarkConnectorSplit;
-import io.github.zhztheplayer.velox4j.connector.NexmarkTableHandle;
-import io.github.zhztheplayer.velox4j.plan.PlanNode;
-import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode;
-import io.github.zhztheplayer.velox4j.plan.TableScanNode;
-import io.github.zhztheplayer.velox4j.type.RowType;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.connector.source.SourceSplit;
-import org.apache.flink.streaming.api.graph.SimpleTransformationTranslator;
-import org.apache.flink.streaming.api.graph.StreamGraph;
-import org.apache.flink.streaming.api.graph.TransformationTranslator;
-import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
-import org.apache.flink.streaming.api.operators.SourceOperatorFactory;
-import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
-import org.apache.flink.streaming.api.transformations.SourceTransformation;
-import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A {@link TransformationTranslator} for the {@link SourceTransformation}.
- *
- * @param <OUT> The type of the elements that this source produces.
- */
-@Internal
-public class SourceTransformationTranslator<OUT, SplitT extends SourceSplit,
EnumChkT>
- extends SimpleTransformationTranslator<OUT, SourceTransformation<OUT,
SplitT, EnumChkT>> {
-
- @Override
- protected Collection<Integer> translateForBatchInternal(
- final SourceTransformation<OUT, SplitT, EnumChkT> transformation, final
Context context) {
-
- return translateInternal(
- transformation, context, false /* don't emit progressive watermarks
*/);
- }
-
- @Override
- protected Collection<Integer> translateForStreamingInternal(
- final SourceTransformation<OUT, SplitT, EnumChkT> transformation, final
Context context) {
-
- return translateInternal(transformation, context, true /* emit progressive
watermarks */);
- }
-
- private Collection<Integer> translateInternal(
- final SourceTransformation<OUT, SplitT, EnumChkT> transformation,
- final Context context,
- boolean emitProgressiveWatermarks) {
- checkNotNull(transformation);
- checkNotNull(context);
-
- final StreamGraph streamGraph = context.getStreamGraph();
- final String slotSharingGroup = context.getSlotSharingGroup();
- final int transformationId = transformation.getId();
- final ExecutionConfig executionConfig = streamGraph.getExecutionConfig();
-
- // --- Begin Gluten-specific code changes ---
- Class<?> sourceClazz = transformation.getSource().getClass();
- if (sourceClazz.getSimpleName().equals("NexmarkSource")) {
- RowType outputType =
- (RowType)
- LogicalTypeConverter.toVLType(
- ((InternalTypeInfo)
transformation.getOutputType()).toLogicalType());
- String id = PlanNodeIdGenerator.newId();
- Object nexmarkSource = transformation.getSource();
- List<?> nexmarkSourceSplits =
- (List<?>)
- ReflectUtils.invokeObjectMethod(
- sourceClazz,
- nexmarkSource,
- "getSplits",
- new Class<?>[] {int.class},
- new Object[] {transformation.getParallelism()});
- Object nexmarkSourceSplit = nexmarkSourceSplits.get(0);
- Object generatorConfig =
- ReflectUtils.getObjectField(
- nexmarkSourceSplit.getClass(), nexmarkSourceSplit,
"generatorConfig");
- Long maxEvents =
- (Long)
- ReflectUtils.getObjectField(generatorConfig.getClass(),
generatorConfig, "maxEvents");
- PlanNode tableScan =
- new TableScanNode(id, outputType, new
NexmarkTableHandle("connector-nexmark"), List.of());
- StreamOperatorFactory<OUT> operatorFactory =
- SimpleOperatorFactory.of(
- new GlutenStreamSource(
- new GlutenVectorSourceFunction(
- new StatefulPlanNode(tableScan.getId(), tableScan),
- Map.of(id, outputType),
- id,
- new NexmarkConnectorSplit(
- "connector-nexmark",
- maxEvents > Integer.MAX_VALUE
- ? Integer.MAX_VALUE
- : maxEvents.intValue()))));
- streamGraph.addLegacySource(
- transformationId,
- slotSharingGroup,
- transformation.getCoLocationGroupKey(),
- operatorFactory,
- null,
- transformation.getOutputType(),
- "Source: " + transformation.getName());
- } else {
- SourceOperatorFactory<OUT> operatorFactory =
- new SourceOperatorFactory<>(
- transformation.getSource(),
- transformation.getWatermarkStrategy(),
- emitProgressiveWatermarks);
-
-
operatorFactory.setChainingStrategy(transformation.getChainingStrategy());
-
operatorFactory.setCoordinatorListeningID(transformation.getCoordinatorListeningID());
-
- streamGraph.addSource(
- transformationId,
- slotSharingGroup,
- transformation.getCoLocationGroupKey(),
- operatorFactory,
- null,
- transformation.getOutputType(),
- "Source: " + transformation.getName());
- }
- // --- End Gluten-specific code changes ---
-
- final int parallelism =
- transformation.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT
- ? transformation.getParallelism()
- : executionConfig.getParallelism();
-
- streamGraph.setParallelism(
- transformationId, parallelism,
transformation.isParallelismConfigured());
- streamGraph.setMaxParallelism(transformationId,
transformation.getMaxParallelism());
-
- streamGraph.setSupportsConcurrentExecutionAttempts(
- transformationId,
transformation.isSupportsConcurrentExecutionAttempts());
-
- return Collections.singleton(transformationId);
- }
-}
diff --git
a/gluten-flink/runtime/src/main/java/org/apache/gluten/velox/VeloxSourceSinkFactory.java
b/gluten-flink/runtime/src/main/java/org/apache/gluten/velox/VeloxSourceSinkFactory.java
new file mode 100644
index 0000000000..aff1bb7795
--- /dev/null
+++
b/gluten-flink/runtime/src/main/java/org/apache/gluten/velox/VeloxSourceSinkFactory.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.velox;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.table.data.RowData;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.ServiceLoader;
+
+public interface VeloxSourceSinkFactory {
+
+ static final Logger LOG =
LoggerFactory.getLogger(VeloxSourceSinkFactory.class);
+
+ /** Match the conditions to determine if the operator can be offloaded to
velox. */
+ boolean match(Transformation<RowData> transformation);
+
+ /** Build source transformation that offload the operator to velox. */
+ Transformation<RowData> buildVeloxSource(
+ Transformation<RowData> transformation, Map<String, Object> parameters);
+
+ /** Build sink transformation that offload the operator to velox. */
+ Transformation<RowData> buildVeloxSink(
+ Transformation<RowData> transformation, Map<String, Object> parameters);
+
+ /** Choose the matched source/sink factory by given transformation. */
+ private static Optional<VeloxSourceSinkFactory> getFactory(
+ Transformation<RowData> transformation) {
+ ServiceLoader<VeloxSourceSinkFactory> factories =
+ ServiceLoader.load(VeloxSourceSinkFactory.class);
+ for (VeloxSourceSinkFactory factory : factories) {
+ if (factory.match(transformation)) {
+ return Optional.of(factory);
+ }
+ }
+ return Optional.empty();
+ }
+
+ /** Build Velox source, or fallback to flink orignal source . */
+ static Transformation<RowData> buildSource(
+ Transformation<RowData> transformation, Map<String, Object> parameters) {
+ Optional<VeloxSourceSinkFactory> factory = getFactory(transformation);
+ if (factory.isEmpty()) {
+ LOG.warn(
+ "Not find matched factory to build velox source transformation, and
we will use flink original transformation {} instead.",
+ transformation.getClass().getName());
+ return transformation;
+ } else {
+ return factory.get().buildVeloxSource(transformation, parameters);
+ }
+ }
+
+ /** Build Velox sink, or fallback to flink original sink. */
+ static Transformation<RowData> buildSink(
+ Transformation<RowData> transformation, Map<String, Object> parameters) {
+ Optional<VeloxSourceSinkFactory> factory = getFactory(transformation);
+ if (factory.isEmpty()) {
+ LOG.warn(
+ "Not find matched factory to build velox sink transformation, and we
will use flink original transformation {} instead.",
+ transformation.getClass().getName());
+ return transformation;
+ } else {
+ return factory.get().buildVeloxSink(transformation, parameters);
+ }
+ }
+}
diff --git a/gluten-flink/ut/pom.xml b/gluten-flink/ut/pom.xml
index 3583f7823f..74311055a6 100644
--- a/gluten-flink/ut/pom.xml
+++ b/gluten-flink/ut/pom.xml
@@ -152,6 +152,36 @@
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>com.salesforce.kafka.test</groupId>
+ <artifactId>kafka-junit5</artifactId>
+ <version>3.2.5</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_2.12</artifactId>
+ <version>3.4.0</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-kafka</artifactId>
+ <version>3.3.0-1.19</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-base</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-json</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git
a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java
b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java
index 5454b29396..476c4cba4d 100644
---
a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java
+++
b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java
@@ -18,13 +18,18 @@ package org.apache.gluten.table.runtime.stream.custom;
import org.apache.gluten.table.runtime.stream.common.Velox4jEnvironment;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import com.salesforce.kafka.test.junit5.SharedKafkaTestResource;
+import com.salesforce.kafka.test.listeners.PlainListener;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,9 +66,30 @@ public class NexmarkTest {
put("PERSON_PROPORTION", "1");
put("AUCTION_PROPORTION", "3");
put("BID_PROPORTION", "46");
+ put("NEXMARK_TABLE", "datagen");
}
};
+ private static final int KAFKA_PORT = 9092;
+ private static String topicName = "nexmark";
+
+ @RegisterExtension
+ public static final SharedKafkaTestResource kafkaInstance =
+ new SharedKafkaTestResource()
+ .withBrokers(1)
+ .registerListener(new PlainListener().onPorts(KAFKA_PORT));
+
+ private static final Map<String, String> KAFKA_VARIABLES =
+ new HashMap<>() {
+ {
+ put("BOOTSTRAP_SERVERS", "localhost:9092");
+ put("NEXMARK_TABLE", "kafka");
+ }
+ };
+
+ private static final List<String> VIEWS = List.of("person", "auction",
"bid", "B");
+ private static final List<String> FUNCTIONS = List.of("count_char");
+
private static StreamTableEnvironment tEnv;
@BeforeAll
@@ -76,31 +102,49 @@ public class NexmarkTest {
EnvironmentSettings settings =
EnvironmentSettings.newInstance().inStreamingMode().build();
tEnv = StreamTableEnvironment.create(env, settings);
-
- setupNexmarkEnvironment(tEnv);
}
@Test
- void testAllNexmarkQueries() throws ExecutionException,
InterruptedException, TimeoutException {
+ void testAllNexmarkSourceQueries()
+ throws ExecutionException, InterruptedException, TimeoutException {
+ setupNexmarkEnvironment(tEnv, "ddl_gen.sql", NEXMARK_VARIABLES);
List<String> queryFiles = getQueries();
assertThat(queryFiles).isNotEmpty();
+ LOG.warn("Found {} Nexmark query files: {}", queryFiles.size(),
queryFiles);
+ for (String queryFile : queryFiles) {
+ LOG.warn("Executing nextmark query from file: {}", queryFile);
+ executeQuery(tEnv, queryFile, false);
+ }
+ clearEnvironment(tEnv);
+ }
+
+ @Test
+ void testAllKafkaSourceQueries()
+ throws ExecutionException, InterruptedException, TimeoutException {
+ kafkaInstance.getKafkaTestUtils().createTopic(topicName, 1, (short) 1);
+ setupNexmarkEnvironment(tEnv, "ddl_kafka.sql", KAFKA_VARIABLES);
+ List<String> queryFiles = getQueries();
+ assertThat(queryFiles).isNotEmpty();
LOG.warn("Found {} Nexmark query files: {}", queryFiles.size(),
queryFiles);
for (String queryFile : queryFiles) {
- LOG.warn("Executing query from file: {}", queryFile);
- executeQuery(tEnv, queryFile);
+ LOG.warn("Executing kafka query from file:{}", queryFile);
+ executeQuery(tEnv, queryFile, true);
}
+ clearEnvironment(tEnv);
}
- private static void setupNexmarkEnvironment(StreamTableEnvironment tEnv) {
- String createNexmarkSource = readSqlFromFile(NEXMARK_RESOURCE_DIR +
"/ddl_gen.sql");
- createNexmarkSource = replaceVariables(createNexmarkSource,
NEXMARK_VARIABLES);
+ private static void setupNexmarkEnvironment(
+ StreamTableEnvironment tEnv, String sourceFileName, Map<String, String>
variables) {
+ String createNexmarkSource = readSqlFromFile(NEXMARK_RESOURCE_DIR + "/" +
sourceFileName);
+ createNexmarkSource = replaceVariables(createNexmarkSource, variables);
tEnv.executeSql(createNexmarkSource);
String createTableView = readSqlFromFile(NEXMARK_RESOURCE_DIR +
"/ddl_views.sql");
String[] sqlTableView = createTableView.split(";");
for (String sql : sqlTableView) {
+ sql = replaceVariables(sql, variables);
String trimmedSql = sql.trim();
if (!trimmedSql.isEmpty()) {
tEnv.executeSql(trimmedSql);
@@ -116,7 +160,23 @@ public class NexmarkTest {
return result;
}
- private void executeQuery(StreamTableEnvironment tEnv, String queryFileName)
+ private static void clearEnvironment(StreamTableEnvironment tEnv) {
+ for (int i = 0; i <= 22; ++i) {
+ String tableName = "nexmark_q" + i;
+ String sql = String.format("drop table if exists %s", tableName);
+ tEnv.executeSql(sql);
+ }
+ for (String view : VIEWS) {
+ String sql = String.format("drop view if exists %s", view);
+ tEnv.executeSql(sql);
+ }
+ for (String func : FUNCTIONS) {
+ String sql = String.format("drop function if exists %s", func);
+ tEnv.executeSql(sql);
+ }
+ }
+
+ private void executeQuery(StreamTableEnvironment tEnv, String queryFileName,
boolean kafkaSource)
throws ExecutionException, InterruptedException, TimeoutException {
String queryContent = readSqlFromFile(NEXMARK_RESOURCE_DIR + "/" +
queryFileName);
@@ -136,7 +196,11 @@ public class NexmarkTest {
String insertQuery = sqlStatements[sqlStatements.length - 2].trim();
if (!insertQuery.isEmpty()) {
TableResult insertResult = tEnv.executeSql(insertQuery);
- waitForJobCompletion(insertResult, 30000);
+ if (kafkaSource) {
+ assertThat(checkJobRunningStatus(insertResult, 30000) == true);
+ } else {
+ waitForJobCompletion(insertResult, 30000);
+ }
}
assertTrue(sqlStatements[sqlStatements.length - 1].trim().isEmpty());
}
@@ -147,6 +211,24 @@ public class NexmarkTest {
result.getJobClient().get().getJobExecutionResult().get(timeoutMs,
TimeUnit.MILLISECONDS);
}
+ private boolean checkJobRunningStatus(TableResult result, long timeoutMs)
+ throws InterruptedException {
+ long startTime = System.currentTimeMillis();
+ assertTrue(result.getJobClient().isPresent());
+ JobClient jobClient = result.getJobClient().get();
+ while (System.currentTimeMillis() < startTime + timeoutMs) {
+ if (jobClient.getJobStatus().complete(JobStatus.RUNNING)) {
+ jobClient.cancel();
+ return true;
+ } else {
+ Thread.sleep(1000);
+ }
+ }
+ LOG.warn("Job not running in " + timeoutMs + " millseconds.");
+ jobClient.cancel();
+ return false;
+ }
+
private List<String> getQueries() {
URL resourceUrl =
getClass().getClassLoader().getResource(NEXMARK_RESOURCE_DIR);
diff --git a/gluten-flink/ut/src/test/resources/nexmark/ddl_kafka.sql
b/gluten-flink/ut/src/test/resources/nexmark/ddl_kafka.sql
new file mode 100644
index 0000000000..27757eaeaf
--- /dev/null
+++ b/gluten-flink/ut/src/test/resources/nexmark/ddl_kafka.sql
@@ -0,0 +1,46 @@
+CREATE TABLE kafka (
+ event_type int,
+ person ROW<
+ id BIGINT,
+ name VARCHAR,
+ emailAddress VARCHAR,
+ creditCard VARCHAR,
+ city VARCHAR,
+ state VARCHAR,
+ `dateTime` TIMESTAMP(3),
+ extra VARCHAR>,
+ auction ROW<
+ id BIGINT,
+ itemName VARCHAR,
+ description VARCHAR,
+ initialBid BIGINT,
+ reserve BIGINT,
+ `dateTime` TIMESTAMP(3),
+ expires TIMESTAMP(3),
+ seller BIGINT,
+ category BIGINT,
+ extra VARCHAR>,
+ bid ROW<
+ auction BIGINT,
+ bidder BIGINT,
+ price BIGINT,
+ channel VARCHAR,
+ url VARCHAR,
+ `dateTime` TIMESTAMP(3),
+ extra VARCHAR>,
+ `dateTime` AS
+ CASE
+ WHEN event_type = 0 THEN person.`dateTime`
+ WHEN event_type = 1 THEN auction.`dateTime`
+ ELSE bid.`dateTime`
+ END,
+ WATERMARK FOR `dateTime` AS `dateTime` - INTERVAL '4' SECOND
+) WITH (
+ 'connector' = 'kafka',
+ 'topic' = 'nexmark',
+ 'properties.bootstrap.servers' = '${BOOTSTRAP_SERVERS}',
+ 'properties.group.id' = 'nexmark',
+ 'scan.startup.mode' = 'earliest-offset',
+ 'sink.partitioner' = 'round-robin',
+ 'format' = 'json'
+);
diff --git a/gluten-flink/ut/src/test/resources/nexmark/ddl_views.sql
b/gluten-flink/ut/src/test/resources/nexmark/ddl_views.sql
index 54902b4b44..36f368dd92 100644
--- a/gluten-flink/ut/src/test/resources/nexmark/ddl_views.sql
+++ b/gluten-flink/ut/src/test/resources/nexmark/ddl_views.sql
@@ -8,7 +8,7 @@ SELECT
person.state,
`dateTime`,
person.extra
-FROM datagen WHERE event_type = 0;
+FROM ${NEXMARK_TABLE} WHERE event_type = 0;
CREATE VIEW auction AS
SELECT
@@ -22,7 +22,7 @@ SELECT
auction.seller,
auction.category,
auction.extra
-FROM datagen WHERE event_type = 1;
+FROM ${NEXMARK_TABLE} WHERE event_type = 1;
CREATE VIEW bid AS
SELECT
@@ -33,4 +33,4 @@ SELECT
bid.url,
`dateTime`,
bid.extra
-FROM datagen WHERE event_type = 2;
+FROM ${NEXMARK_TABLE} WHERE event_type = 2;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]