This is an automated email from the ASF dual-hosted git repository.

lgbo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 4feac9bba3 [GLUTEN-10064][FLINK] Support filesystem sink (#11300)
4feac9bba3 is described below

commit 4feac9bba366e032c7ad50fccc603d6fe3c29f8f
Author: kevinyhzou <[email protected]>
AuthorDate: Thu Dec 25 10:33:50 2025 +0800

    [GLUTEN-10064][FLINK] Support filesystem sink (#11300)
    
    * support filesytstem sink
    
    * fix reviews
---
 .github/workflows/flink.yml                        |   2 +-
 gluten-flink/docs/Flink.md                         |   2 +-
 .../plan/nodes/exec/common/CommonExecSink.java     |  38 +-
 .../apache/gluten/velox/FileSystemSinkFactory.java | 161 ++++++
 .../gluten/velox/FuzzerSourceSinkFactory.java      | 149 ++++++
 .../org.apache.gluten.velox.VeloxSourceSinkFactory |   2 +
 gluten-flink/runtime/pom.xml                       |   6 +
 .../LegacySourceTransformationTranslator.java      | 137 ------
 .../translators/SinkTransformationTranslator.java  | 543 ---------------------
 .../api/operators/GlutenStreamSource.java          |  35 ++
 .../table/runtime/config/VeloxConnectorConfig.java |  58 +++
 .../runtime/operators/GlutenOneInputOperator.java  |   6 +-
 .../runtime/operators/GlutenSourceFunction.java    |   6 +-
 .../operators/GlutenVectorOneInputOperator.java    |   8 +-
 .../operators/GlutenVectorSourceFunction.java      |  19 +-
 .../operators/GlutenVectorTwoInputOperator.java    |   6 +-
 .../java/org/apache/gluten/util/ReflectUtils.java  |   9 +
 .../main/java/org/apache/gluten/util/Utils.java    |   9 +
 gluten-flink/ut/pom.xml                            |  12 +
 gluten-flink/ut/src/test/resources/nexmark/q10.sql |  23 +
 20 files changed, 514 insertions(+), 717 deletions(-)

diff --git a/.github/workflows/flink.yml b/.github/workflows/flink.yml
index 099ee73438..aefa159f9b 100644
--- a/.github/workflows/flink.yml
+++ b/.github/workflows/flink.yml
@@ -61,7 +61,7 @@ jobs:
           sudo yum install 
https://mirror.stream.centos.org/9-stream/BaseOS/x86_64/os/Packages/tzdata-2025a-1.el9.noarch.rpm
 -y
           sudo .github/workflows/util/install-flink-resources.sh
           git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git
-          cd velox4j && git reset --hard 
1753fa68f71d8a1a0df2d4a0ff346ae00e973e9c
+          cd velox4j && git reset --hard 
430d149837b9311df2787797fe0e0ec9abefd688
           git apply $GITHUB_WORKSPACE/gluten-flink/patches/fix-velox4j.patch
           mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true
           cd ..
diff --git a/gluten-flink/docs/Flink.md b/gluten-flink/docs/Flink.md
index a73b56bb32..8424ccc190 100644
--- a/gluten-flink/docs/Flink.md
+++ b/gluten-flink/docs/Flink.md
@@ -48,7 +48,7 @@ As some features have not been committed to upstream, you 
have to use the follow
 ## fetch velox4j code
 git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git
 cd velox4j
-git reset --hard 1753fa68f71d8a1a0df2d4a0ff346ae00e973e9c
+git reset --hard 430d149837b9311df2787797fe0e0ec9abefd688
 mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true
 ```
 **Get gluten**
diff --git 
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
 
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
index a1b0e0e7b7..8dbac05783 100644
--- 
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
+++ 
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
@@ -207,16 +207,25 @@ public abstract class CommonExecSink extends 
ExecNodeBase<Object>
     if (targetRowKind.isPresent()) {
       sinkTransform = applyRowKindSetter(sinkTransform, targetRowKind.get(), 
config);
     }
-
-    return (Transformation<Object>)
-        applySinkProvider(
-            sinkTransform,
-            streamExecEnv,
-            runtimeProvider,
-            rowtimeFieldIndex,
-            sinkParallelism,
-            config,
-            classLoader);
+    // --- Begin Gluten-specific code changes ---
+    Transformation<Object> transformation =
+        (Transformation<Object>)
+            applySinkProvider(
+                sinkTransform,
+                streamExecEnv,
+                runtimeProvider,
+                rowtimeFieldIndex,
+                sinkParallelism,
+                config,
+                classLoader);
+    return VeloxSourceSinkFactory.buildSink(
+        (Transformation) transformation,
+        Map.of(
+            Configuration.class.getName(),
+            streamExecEnv.getConfiguration(),
+            ResolvedSchema.class.getName(),
+            schema));
+    // --- End Gluten-specific code changes ---
   }
 
   /** Apply an operator to filter or report error to process not-null values 
for not-null fields. */
@@ -467,13 +476,8 @@ public abstract class CommonExecSink extends 
ExecNodeBase<Object>
       } else if (runtimeProvider instanceof SinkFunctionProvider) {
         final SinkFunction<RowData> sinkFunction =
             ((SinkFunctionProvider) runtimeProvider).createSinkFunction();
-        // --- Begin Gluten-specific code changes ---
-        Transformation sinkTransformation =
-            createSinkFunctionTransformation(
-                sinkFunction, env, inputTransform, rowtimeFieldIndex, 
sinkMeta, sinkParallelism);
-        return VeloxSourceSinkFactory.buildSink(
-            sinkTransformation, Map.of(Configuration.class.getName(), 
env.getConfiguration()));
-        // --- End Gluten-specific code changes ---
+        return createSinkFunctionTransformation(
+            sinkFunction, env, inputTransform, rowtimeFieldIndex, sinkMeta, 
sinkParallelism);
       } else if (runtimeProvider instanceof OutputFormatProvider) {
         OutputFormat<RowData> outputFormat =
             ((OutputFormatProvider) runtimeProvider).createOutputFormat();
diff --git 
a/gluten-flink/planner/src/main/java/org/apache/gluten/velox/FileSystemSinkFactory.java
 
b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/FileSystemSinkFactory.java
new file mode 100644
index 0000000000..657e321762
--- /dev/null
+++ 
b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/FileSystemSinkFactory.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.velox;
+
+import org.apache.gluten.streaming.api.operators.GlutenOneInputOperatorFactory;
+import org.apache.gluten.table.runtime.operators.GlutenOneInputOperator;
+import org.apache.gluten.util.LogicalTypeConverter;
+import org.apache.gluten.util.PlanNodeIdGenerator;
+import org.apache.gluten.util.ReflectUtils;
+
+import io.github.zhztheplayer.velox4j.connector.CommitStrategy;
+import io.github.zhztheplayer.velox4j.connector.FileSystemInsertTableHandle;
+import io.github.zhztheplayer.velox4j.plan.EmptyNode;
+import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode;
+import io.github.zhztheplayer.velox4j.plan.TableWriteNode;
+import io.github.zhztheplayer.velox4j.type.BigIntType;
+import io.github.zhztheplayer.velox4j.type.RowType;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.api.transformations.SinkTransformation;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class FileSystemSinkFactory implements VeloxSourceSinkFactory {
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public boolean match(Transformation<RowData> transformation) {
+    if (transformation instanceof SinkTransformation) {
+      SinkTransformation<RowData, RowData> sinkTransformation =
+          (SinkTransformation<RowData, RowData>) transformation;
+      Transformation<RowData> inputTransformation =
+          (Transformation<RowData>) sinkTransformation.getInputs().get(0);
+      if (inputTransformation instanceof OneInputTransformation
+          && inputTransformation.getName().equals("PartitionCommitter")) {
+        OneInputTransformation<RowData, RowData> oneInputTransformatin =
+            (OneInputTransformation<RowData, RowData>) inputTransformation;
+        Transformation<RowData> preInputTransformation =
+            (Transformation<RowData>) oneInputTransformatin.getInputs().get(0);
+        return preInputTransformation.getName().equals("StreamingFileWriter");
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public Transformation<RowData> buildVeloxSource(
+      Transformation<RowData> transformation, Map<String, Object> parameters) {
+    throw new UnsupportedOperationException("Unimplemented method 
'buildVeloxSource'");
+  }
+
+  @SuppressWarnings({"unchecked", "rawtypes"})
+  @Override
+  public Transformation<RowData> buildVeloxSink(
+      Transformation<RowData> transformation, Map<String, Object> parameters) {
+    SinkTransformation<RowData, RowData> sinkTransformation =
+        (SinkTransformation<RowData, RowData>) transformation;
+    OneInputTransformation<RowData, RowData> partitionCommitTransformation =
+        (OneInputTransformation<RowData, RowData>) 
sinkTransformation.getInputs().get(0);
+    OneInputTransformation<RowData, RowData> fileWriterTransformation =
+        (OneInputTransformation<RowData, RowData>) 
partitionCommitTransformation.getInputs().get(0);
+    OneInputStreamOperator<?, ?> operator = 
fileWriterTransformation.getOperator();
+    List<String> partitionKeys =
+        (List<String>) ReflectUtils.getObjectField(operator.getClass(), 
operator, "partitionKeys");
+    Map<String, String> tableParams = new HashMap<>();
+    Configuration tableOptions =
+        (Configuration)
+            ReflectUtils.getObjectField(
+                
"org.apache.flink.connector.file.table.stream.PartitionCommitter",
+                partitionCommitTransformation.getOperator(),
+                "conf");
+    tableParams.putAll(tableOptions.toMap());
+
+    ResolvedSchema schema = (ResolvedSchema) 
parameters.get(ResolvedSchema.class.getName());
+    List<String> columnList = schema.getColumnNames();
+    List<Integer> partitionIndexes =
+        
partitionKeys.stream().mapToInt(columnList::indexOf).boxed().collect(Collectors.toList());
+    org.apache.flink.table.types.logical.RowType inputType =
+        (org.apache.flink.table.types.logical.RowType)
+            ((InternalTypeInfo<?>) 
fileWriterTransformation.getInputType()).toLogicalType();
+    RowType inputDataColumns = (RowType) 
LogicalTypeConverter.toVLType(inputType);
+    FileSystemInsertTableHandle insertTableHandle =
+        new FileSystemInsertTableHandle(
+            fileWriterTransformation.getName(),
+            inputDataColumns,
+            partitionKeys,
+            partitionIndexes,
+            tableParams);
+    RowType ignore = new RowType(List.of("num"), List.of(new BigIntType()));
+    TableWriteNode fileSystemWriteNode =
+        new TableWriteNode(
+            PlanNodeIdGenerator.newId(),
+            inputDataColumns,
+            inputDataColumns.getNames(),
+            null,
+            "connector-filesystem",
+            insertTableHandle,
+            false,
+            ignore,
+            CommitStrategy.NO_COMMIT,
+            List.of(new EmptyNode(inputDataColumns)));
+    GlutenOneInputOperator onewInputOperator =
+        new GlutenOneInputOperator(
+            new StatefulPlanNode(fileSystemWriteNode.getId(), 
fileSystemWriteNode),
+            PlanNodeIdGenerator.newId(),
+            inputDataColumns,
+            Map.of(fileSystemWriteNode.getId(), ignore));
+    GlutenOneInputOperatorFactory<?, ?> operatorFactory =
+        new GlutenOneInputOperatorFactory(onewInputOperator);
+    Transformation<RowData> veloxFileWriterTransformation =
+        new OneInputTransformation(
+            fileWriterTransformation.getInputs().get(0),
+            fileWriterTransformation.getName(),
+            operatorFactory,
+            fileWriterTransformation.getOutputType(),
+            fileWriterTransformation.getParallelism());
+    OneInputTransformation<RowData, RowData> newPartitionCommitTransformation =
+        new OneInputTransformation(
+            veloxFileWriterTransformation,
+            partitionCommitTransformation.getName(),
+            partitionCommitTransformation.getOperatorFactory(),
+            partitionCommitTransformation.getOutputType(),
+            partitionCommitTransformation.getParallelism());
+    DataStream<RowData> newInputStream =
+        new DataStream<RowData>(
+            sinkTransformation.getInputStream().getExecutionEnvironment(),
+            newPartitionCommitTransformation);
+    return new SinkTransformation<RowData, RowData>(
+        newInputStream,
+        sinkTransformation.getSink(),
+        sinkTransformation.getOutputType(),
+        sinkTransformation.getName(),
+        sinkTransformation.getParallelism(),
+        sinkTransformation.isParallelismConfigured(),
+        sinkTransformation.getSinkOperatorsUidHashes());
+  }
+}
diff --git 
a/gluten-flink/planner/src/main/java/org/apache/gluten/velox/FuzzerSourceSinkFactory.java
 
b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/FuzzerSourceSinkFactory.java
new file mode 100644
index 0000000000..16ac7f0831
--- /dev/null
+++ 
b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/FuzzerSourceSinkFactory.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.velox;
+
+import org.apache.gluten.streaming.api.operators.GlutenOneInputOperatorFactory;
+import org.apache.gluten.streaming.api.operators.GlutenStreamSource;
+import org.apache.gluten.table.runtime.operators.GlutenVectorOneInputOperator;
+import org.apache.gluten.table.runtime.operators.GlutenVectorSourceFunction;
+import org.apache.gluten.util.LogicalTypeConverter;
+import org.apache.gluten.util.PlanNodeIdGenerator;
+
+import io.github.zhztheplayer.velox4j.connector.CommitStrategy;
+import io.github.zhztheplayer.velox4j.connector.DiscardDataTableHandle;
+import io.github.zhztheplayer.velox4j.connector.FuzzerConnectorSplit;
+import io.github.zhztheplayer.velox4j.connector.FuzzerTableHandle;
+import io.github.zhztheplayer.velox4j.plan.EmptyNode;
+import io.github.zhztheplayer.velox4j.plan.PlanNode;
+import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode;
+import io.github.zhztheplayer.velox4j.plan.TableScanNode;
+import io.github.zhztheplayer.velox4j.plan.TableWriteNode;
+import io.github.zhztheplayer.velox4j.type.BigIntType;
+import io.github.zhztheplayer.velox4j.type.RowType;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.dag.Transformation;
+import 
org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
+import 
org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource;
+import 
org.apache.flink.streaming.api.transformations.LegacySourceTransformation;
+import org.apache.flink.streaming.api.transformations.SinkTransformation;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+
+import java.util.List;
+import java.util.Map;
+
+public class FuzzerSourceSinkFactory implements VeloxSourceSinkFactory {
+
+  @SuppressWarnings({"unchecked"})
+  @Override
+  public boolean match(Transformation<RowData> transformation) {
+    if (transformation instanceof SinkTransformation) {
+      SinkTransformation<RowData, RowData> sinkTransformation =
+          (SinkTransformation<RowData, RowData>) transformation;
+      Transformation<?> inputTransformation = 
sinkTransformation.getInputs().get(0);
+      if (sinkTransformation.getSink() instanceof DiscardingSink
+          && !inputTransformation.getName().equals("PartitionCommitter")) {
+        return true;
+      }
+    } else if (transformation instanceof LegacySourceTransformation) {
+      Function userFunction =
+          ((LegacySourceTransformation<RowData>) 
transformation).getOperator().getUserFunction();
+      return userFunction instanceof DataGeneratorSource;
+    }
+    return false;
+  }
+
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  @Override
+  public Transformation<RowData> buildVeloxSource(
+      Transformation<RowData> transformation, Map<String, Object> parameters) {
+    LegacySourceTransformation<RowData> sourceTransformation =
+        (LegacySourceTransformation<RowData>) transformation;
+    RowType outputType =
+        (RowType)
+            LogicalTypeConverter.toVLType(
+                ((InternalTypeInfo) 
sourceTransformation.getOutputType()).toLogicalType());
+    String id = PlanNodeIdGenerator.newId();
+    /// Create fuzzer table handle by 2 parameters: (1) a string as connector 
id; (2) a int as seed
+    FuzzerTableHandle tableHandle = new FuzzerTableHandle("connector-fuzzer", 
12367);
+    PlanNode tableScan = new TableScanNode(id, outputType, tableHandle, 
List.of());
+    GlutenStreamSource sourceOp =
+        new GlutenStreamSource(
+            new GlutenVectorSourceFunction(
+                new StatefulPlanNode(id, tableScan),
+                Map.of(id, outputType),
+                id,
+                new FuzzerConnectorSplit("connector-fuzzer", 1000)));
+    return new LegacySourceTransformation<RowData>(
+        sourceTransformation.getName(),
+        sourceOp,
+        sourceTransformation.getOutputType(),
+        sourceTransformation.getParallelism(),
+        sourceTransformation.getBoundedness(),
+        false);
+  }
+
+  @SuppressWarnings({"unchecked", "rawtypes"})
+  @Override
+  public Transformation<RowData> buildVeloxSink(
+      Transformation<RowData> transformation, Map<String, Object> parameters) {
+    SinkTransformation<RowData, RowData> sinkTransformation =
+        (SinkTransformation<RowData, RowData>) transformation;
+    RowType outputType =
+        (RowType)
+            LogicalTypeConverter.toVLType(
+                ((InternalTypeInfo) 
transformation.getOutputType()).toLogicalType());
+    // TODO: this is a constrain of velox.
+    // The result type should be ignored, as the data is written by velox,
+    // and no result need to return.
+    RowType ignore = new RowType(List.of("num"), List.of(new BigIntType()));
+    PlanNode plan =
+        new TableWriteNode(
+            PlanNodeIdGenerator.newId(),
+            outputType,
+            outputType.getNames(),
+            null,
+            "connector-fuzzer",
+            new DiscardDataTableHandle(),
+            false,
+            ignore,
+            CommitStrategy.NO_COMMIT,
+            List.of(new EmptyNode(outputType)));
+    GlutenOneInputOperatorFactory operatorFactory =
+        new GlutenOneInputOperatorFactory(
+            new GlutenVectorOneInputOperator(
+                new StatefulPlanNode(plan.getId(), plan),
+                PlanNodeIdGenerator.newId(),
+                outputType,
+                Map.of(plan.getId(), ignore)));
+    DataStream<RowData> newInputStream =
+        sinkTransformation
+            .getInputStream()
+            .transform("Writer", CommittableMessageTypeInfo.noOutput(), 
operatorFactory);
+    return new SinkTransformation<RowData, RowData>(
+        newInputStream,
+        sinkTransformation.getSink(),
+        sinkTransformation.getOutputType(),
+        sinkTransformation.getName(),
+        sinkTransformation.getParallelism(),
+        sinkTransformation.isParallelismConfigured(),
+        sinkTransformation.getSinkOperatorsUidHashes());
+  }
+}
diff --git 
a/gluten-flink/planner/src/main/resources/META-INF/services/org.apache.gluten.velox.VeloxSourceSinkFactory
 
b/gluten-flink/planner/src/main/resources/META-INF/services/org.apache.gluten.velox.VeloxSourceSinkFactory
index 9d7623b7ec..b2d824ee12 100644
--- 
a/gluten-flink/planner/src/main/resources/META-INF/services/org.apache.gluten.velox.VeloxSourceSinkFactory
+++ 
b/gluten-flink/planner/src/main/resources/META-INF/services/org.apache.gluten.velox.VeloxSourceSinkFactory
@@ -2,3 +2,5 @@ org.apache.gluten.velox.FromElementsSourceFactory
 org.apache.gluten.velox.KafkaSourceSinkFactory
 org.apache.gluten.velox.PrintSinkFactory
 org.apache.gluten.velox.NexmarkSourceFactory
+org.apache.gluten.velox.FileSystemSinkFactory
+org.apache.gluten.velox.FuzzerSourceSinkFactory
diff --git a/gluten-flink/runtime/pom.xml b/gluten-flink/runtime/pom.xml
index 41e6cd414e..017bbf7736 100644
--- a/gluten-flink/runtime/pom.xml
+++ b/gluten-flink/runtime/pom.xml
@@ -57,6 +57,12 @@
       <version>${flink.version}</version>
       <scope>provided</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-connector-files</artifactId>
+      <version>${flink.version}</version>
+      <scope>provided</scope>
+    </dependency>
   </dependencies>
 
   <build>
diff --git 
a/gluten-flink/runtime/src/main/java/org/apache/flink/streaming/runtime/translators/LegacySourceTransformationTranslator.java
 
b/gluten-flink/runtime/src/main/java/org/apache/flink/streaming/runtime/translators/LegacySourceTransformationTranslator.java
deleted file mode 100644
index cba9057b24..0000000000
--- 
a/gluten-flink/runtime/src/main/java/org/apache/flink/streaming/runtime/translators/LegacySourceTransformationTranslator.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.runtime.translators;
-
-import org.apache.gluten.streaming.api.operators.GlutenStreamSource;
-import org.apache.gluten.table.runtime.operators.GlutenVectorSourceFunction;
-import org.apache.gluten.util.LogicalTypeConverter;
-import org.apache.gluten.util.PlanNodeIdGenerator;
-
-import io.github.zhztheplayer.velox4j.connector.FuzzerConnectorSplit;
-import io.github.zhztheplayer.velox4j.connector.FuzzerTableHandle;
-import io.github.zhztheplayer.velox4j.plan.PlanNode;
-import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode;
-import io.github.zhztheplayer.velox4j.plan.TableScanNode;
-import io.github.zhztheplayer.velox4j.type.RowType;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.Function;
-import 
org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource;
-import org.apache.flink.streaming.api.graph.SimpleTransformationTranslator;
-import org.apache.flink.streaming.api.graph.StreamGraph;
-import org.apache.flink.streaming.api.graph.TransformationTranslator;
-import org.apache.flink.streaming.api.operators.InputFormatOperatorFactory;
-import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
-import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
-import 
org.apache.flink.streaming.api.transformations.LegacySourceTransformation;
-import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Gluten {@link TransformationTranslator} for the {@link 
LegacySourceTransformation}.
- *
- * @param <OUT> The type of the elements that the {@link 
LegacySourceTransformation} we are
- *     translating is producing.
- */
-@Internal
-public class LegacySourceTransformationTranslator<OUT>
-    extends SimpleTransformationTranslator<OUT, 
LegacySourceTransformation<OUT>> {
-
-  @Override
-  protected Collection<Integer> translateForBatchInternal(
-      final LegacySourceTransformation<OUT> transformation, final Context 
context) {
-    return translateInternal(transformation, context);
-  }
-
-  @Override
-  protected Collection<Integer> translateForStreamingInternal(
-      final LegacySourceTransformation<OUT> transformation, final Context 
context) {
-    return translateInternal(transformation, context);
-  }
-
-  private Collection<Integer> translateInternal(
-      final LegacySourceTransformation<OUT> transformation, final Context 
context) {
-    checkNotNull(transformation);
-    checkNotNull(context);
-
-    final StreamGraph streamGraph = context.getStreamGraph();
-    final String slotSharingGroup = context.getSlotSharingGroup();
-    final int transformationId = transformation.getId();
-    final ExecutionConfig executionConfig = streamGraph.getExecutionConfig();
-
-    Function userFunction = transformation.getOperator().getUserFunction();
-    StreamOperatorFactory<OUT> operatorFactory;
-    String namePrefix = "";
-    // --- Begin Gluten-specific code changes ---
-    if (userFunction instanceof DataGeneratorSource) {
-      RowType outputType =
-          (RowType)
-              LogicalTypeConverter.toVLType(
-                  ((InternalTypeInfo) 
transformation.getOutputType()).toLogicalType());
-      String id = PlanNodeIdGenerator.newId();
-      PlanNode tableScan =
-          new TableScanNode(
-              id, outputType, new FuzzerTableHandle("connector-fuzzer", 
12367), List.of());
-      operatorFactory =
-          SimpleOperatorFactory.of(
-              new GlutenStreamSource(
-                  new GlutenVectorSourceFunction(
-                      new StatefulPlanNode(id, tableScan),
-                      Map.of(id, outputType),
-                      id,
-                      new FuzzerConnectorSplit("connector-fuzzer", 1000))));
-      namePrefix = "Gluten ";
-    } else {
-      operatorFactory = transformation.getOperatorFactory();
-    }
-    // --- End Gluten-specific code changes ---
-    streamGraph.addLegacySource(
-        transformationId,
-        slotSharingGroup,
-        transformation.getCoLocationGroupKey(),
-        operatorFactory,
-        null,
-        transformation.getOutputType(),
-        namePrefix + "Source: " + transformation.getName());
-
-    if (transformation.getOperatorFactory() instanceof 
InputFormatOperatorFactory) {
-      streamGraph.setInputFormat(
-          transformationId,
-          ((InputFormatOperatorFactory<OUT>) 
transformation.getOperatorFactory()).getInputFormat());
-    }
-
-    final int parallelism =
-        transformation.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT
-            ? transformation.getParallelism()
-            : executionConfig.getParallelism();
-    streamGraph.setParallelism(
-        transformationId, parallelism, 
transformation.isParallelismConfigured());
-    streamGraph.setMaxParallelism(transformationId, 
transformation.getMaxParallelism());
-
-    streamGraph.setSupportsConcurrentExecutionAttempts(
-        transformationId, 
transformation.isSupportsConcurrentExecutionAttempts());
-
-    return Collections.singleton(transformationId);
-  }
-}
diff --git 
a/gluten-flink/runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
 
b/gluten-flink/runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
deleted file mode 100644
index 2bebc93af7..0000000000
--- 
a/gluten-flink/runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
+++ /dev/null
@@ -1,543 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.runtime.translators;
-
-import org.apache.gluten.streaming.api.operators.GlutenOneInputOperatorFactory;
-import org.apache.gluten.table.runtime.operators.GlutenVectorOneInputOperator;
-import org.apache.gluten.util.LogicalTypeConverter;
-import org.apache.gluten.util.PlanNodeIdGenerator;
-
-import io.github.zhztheplayer.velox4j.connector.CommitStrategy;
-import io.github.zhztheplayer.velox4j.connector.DiscardDataTableHandle;
-import io.github.zhztheplayer.velox4j.plan.EmptyNode;
-import io.github.zhztheplayer.velox4j.plan.PlanNode;
-import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode;
-import io.github.zhztheplayer.velox4j.plan.TableWriteNode;
-import io.github.zhztheplayer.velox4j.type.BigIntType;
-import io.github.zhztheplayer.velox4j.type.RowType;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.SupportsConcurrentExecutionAttempts;
-import org.apache.flink.api.common.operators.SlotSharingGroup;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.connector.sink2.Sink;
-import org.apache.flink.api.connector.sink2.SupportsCommitter;
-import org.apache.flink.api.dag.Transformation;
-import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
-import 
org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
-import org.apache.flink.streaming.api.connector.sink2.StandardSinkTopologies;
-import 
org.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology;
-import 
org.apache.flink.streaming.api.connector.sink2.SupportsPreCommitTopology;
-import org.apache.flink.streaming.api.connector.sink2.SupportsPreWriteTopology;
-import org.apache.flink.streaming.api.datastream.CustomSinkOperatorUidHashes;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
-import org.apache.flink.streaming.api.graph.TransformationTranslator;
-import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
-import org.apache.flink.streaming.api.transformations.OneInputTransformation;
-import org.apache.flink.streaming.api.transformations.PartitionTransformation;
-import org.apache.flink.streaming.api.transformations.PhysicalTransformation;
-import org.apache.flink.streaming.api.transformations.SinkTransformation;
-import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
-import 
org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory;
-import 
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory;
-import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
-import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
-import org.apache.flink.util.Preconditions;
-
-import javax.annotation.Nullable;
-
-import java.util.ArrayDeque;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Queue;
-import java.util.Set;
-import java.util.function.BiConsumer;
-import java.util.function.Function;
-
-import static org.apache.flink.util.Preconditions.checkState;
-
-/**
- * A {@link org.apache.flink.streaming.api.graph.TransformationTranslator} for 
the {@link
- * org.apache.flink.streaming.api.transformations.SinkTransformation}.
- */
-@Internal
-public class SinkTransformationTranslator<Input, Output>
-    implements TransformationTranslator<Output, SinkTransformation<Input, 
Output>> {
-
-  private static final String COMMITTER_NAME = "Committer";
-  private static final String WRITER_NAME = "Writer";
-
-  @Override
-  public Collection<Integer> translateForBatch(
-      SinkTransformation<Input, Output> transformation, Context context) {
-    return translateInternal(transformation, context, true);
-  }
-
-  @Override
-  public Collection<Integer> translateForStreaming(
-      SinkTransformation<Input, Output> transformation, Context context) {
-    return translateInternal(transformation, context, false);
-  }
-
-  private Collection<Integer> translateInternal(
-      SinkTransformation<Input, Output> transformation, Context context, 
boolean batch) {
-    SinkExpander<Input> expander =
-        new SinkExpander<>(
-            transformation.getInputStream(),
-            transformation.getSink(),
-            transformation,
-            context,
-            batch);
-    expander.expand();
-    return Collections.emptyList();
-  }
-
-  /**
-   * Expands the FLIP-143 Sink to a subtopology. Each part of the topology is 
created after the
-   * previous part of the topology has been completely configured by the user. 
For example, if a
-   * user explicitly sets the parallelism of the sink, each part of the 
subtopology can rely on the
-   * input having that parallelism.
-   */
-  private static class SinkExpander<T> {
-    private final SinkTransformation<T, ?> transformation;
-    private final Sink<T> sink;
-    private final Context context;
-    private final DataStream<T> inputStream;
-    private final StreamExecutionEnvironment executionEnvironment;
-    private final Optional<Integer> environmentParallelism;
-    private final boolean isBatchMode;
-    private final boolean isCheckpointingEnabled;
-
-    public SinkExpander(
-        DataStream<T> inputStream,
-        Sink<T> sink,
-        SinkTransformation<T, ?> transformation,
-        Context context,
-        boolean isBatchMode) {
-      this.inputStream = inputStream;
-      this.executionEnvironment = inputStream.getExecutionEnvironment();
-      this.environmentParallelism =
-          executionEnvironment
-              .getConfig()
-              .toConfiguration()
-              .getOptional(CoreOptions.DEFAULT_PARALLELISM);
-      this.isCheckpointingEnabled =
-          executionEnvironment.getCheckpointConfig().isCheckpointingEnabled();
-      this.transformation = transformation;
-      this.sink = sink;
-      this.context = context;
-      this.isBatchMode = isBatchMode;
-    }
-
-    private void expand() {
-
-      final int sizeBefore = executionEnvironment.getTransformations().size();
-
-      DataStream<T> prewritten = inputStream;
-
-      if (sink instanceof SupportsPreWriteTopology) {
-        prewritten =
-            adjustTransformations(
-                prewritten,
-                ((SupportsPreWriteTopology<T>) sink)::addPreWriteTopology,
-                true,
-                sink instanceof SupportsConcurrentExecutionAttempts);
-      }
-
-      if (sink instanceof SupportsPreCommitTopology) {
-        Preconditions.checkArgument(
-            sink instanceof SupportsCommitter,
-            "Sink with SupportsPreCommitTopology should implement 
SupportsCommitter");
-      }
-      if (sink instanceof SupportsPostCommitTopology) {
-        Preconditions.checkArgument(
-            sink instanceof SupportsCommitter,
-            "Sink with SupportsPostCommitTopology should implement 
SupportsCommitter");
-      }
-
-      if (sink instanceof SupportsCommitter) {
-        addCommittingTopology(sink, prewritten);
-      } else {
-        // --- Begin Gluten-specific code changes ---
-        if (sink instanceof DiscardingSink) {
-          RowType outputType =
-              (RowType)
-                  LogicalTypeConverter.toVLType(
-                      ((InternalTypeInfo) 
transformation.getOutputType()).toLogicalType());
-          // TODO: this is a constrain of velox.
-          // The result type should be ignored, as the data is written by 
velox,
-          // and no result need to return.
-          RowType ignore = new RowType(List.of("num"), List.of(new 
BigIntType()));
-          PlanNode plan =
-              new TableWriteNode(
-                  PlanNodeIdGenerator.newId(),
-                  outputType,
-                  outputType.getNames(),
-                  null,
-                  "connector-fuzzer",
-                  new DiscardDataTableHandle(),
-                  false,
-                  ignore,
-                  CommitStrategy.NO_COMMIT,
-                  List.of(new EmptyNode(outputType)));
-          adjustTransformations(
-              prewritten,
-              input ->
-                  input.transform(
-                      WRITER_NAME,
-                      CommittableMessageTypeInfo.noOutput(),
-                      new GlutenOneInputOperatorFactory(
-                          new GlutenVectorOneInputOperator(
-                              new StatefulPlanNode(plan.getId(), plan),
-                              PlanNodeIdGenerator.newId(),
-                              outputType,
-                              Map.of(plan.getId(), ignore)))),
-              false,
-              sink instanceof SupportsConcurrentExecutionAttempts);
-        } else {
-          adjustTransformations(
-              prewritten,
-              input ->
-                  input.transform(
-                      WRITER_NAME,
-                      CommittableMessageTypeInfo.noOutput(),
-                      new SinkWriterOperatorFactory<>(sink)),
-              false,
-              sink instanceof SupportsConcurrentExecutionAttempts);
-        }
-        // --- End Gluten-specific code changes ---
-      }
-
-      getSinkTransformations(sizeBefore).forEach(context::transform);
-
-      disallowUnalignedCheckpoint(getSinkTransformations(sizeBefore));
-
-      // Remove all added sink subtransformations to avoid duplications and 
allow additional
-      // expansions
-      while (executionEnvironment.getTransformations().size() > sizeBefore) {
-        executionEnvironment
-            .getTransformations()
-            .remove(executionEnvironment.getTransformations().size() - 1);
-      }
-    }
-
-    private List<Transformation<?>> getSinkTransformations(int sizeBefore) {
-      return executionEnvironment
-          .getTransformations()
-          .subList(sizeBefore, 
executionEnvironment.getTransformations().size());
-    }
-
-    /**
-     * Disables UC for all connections of operators within the sink expansion. 
This is necessary
-     * because committables need to be at the respective operators on 
notifyCheckpointComplete or
-     * else we can't commit all side-effects, which violates the contract of
-     * notifyCheckpointComplete.
-     */
-    private void disallowUnalignedCheckpoint(List<Transformation<?>> 
sinkTransformations) {
-      Optional<Transformation<?>> writerOpt =
-          
sinkTransformations.stream().filter(SinkExpander::isWriter).findFirst();
-      Preconditions.checkState(writerOpt.isPresent(), "Writer transformation 
not found.");
-      Transformation<?> writer = writerOpt.get();
-      int indexOfWriter = sinkTransformations.indexOf(writer);
-
-      // check all transformation after the writer and recursively disable UC 
for all inputs
-      // up to the writer
-      Set<Integer> seen = new HashSet<>(writer.getId());
-      Queue<Transformation<?>> pending =
-          new ArrayDeque<>(
-              sinkTransformations.subList(indexOfWriter + 1, 
sinkTransformations.size()));
-
-      while (!pending.isEmpty()) {
-        Transformation<?> current = pending.poll();
-        seen.add(current.getId());
-
-        for (Transformation<?> input : current.getInputs()) {
-          if (input instanceof PartitionTransformation) {
-            ((PartitionTransformation<?>) 
input).getPartitioner().disableUnalignedCheckpoints();
-          }
-          if (seen.add(input.getId())) {
-            pending.add(input);
-          }
-        }
-      }
-    }
-
-    private static boolean isWriter(Transformation<?> t) {
-      if (!(t instanceof OneInputTransformation)) {
-        return false;
-      }
-      StreamOperatorFactory operatorFactory =
-          ((OneInputTransformation<?, ?>) t).getOperatorFactory();
-      return operatorFactory instanceof SinkWriterOperatorFactory
-          || operatorFactory instanceof GlutenOneInputOperatorFactory;
-    }
-
-    private <CommT, WriteResultT> void addCommittingTopology(
-        Sink<T> sink, DataStream<T> inputStream) {
-      SupportsCommitter<CommT> committingSink = (SupportsCommitter<CommT>) 
sink;
-      TypeInformation<CommittableMessage<CommT>> committableTypeInformation =
-          
CommittableMessageTypeInfo.of(committingSink::getCommittableSerializer);
-
-      DataStream<CommittableMessage<CommT>> precommitted;
-      if (sink instanceof SupportsPreCommitTopology) {
-        SupportsPreCommitTopology<WriteResultT, CommT> preCommittingSink =
-            (SupportsPreCommitTopology<WriteResultT, CommT>) sink;
-        TypeInformation<CommittableMessage<WriteResultT>> 
writeResultTypeInformation =
-            
CommittableMessageTypeInfo.of(preCommittingSink::getWriteResultSerializer);
-
-        DataStream<CommittableMessage<WriteResultT>> writerResult =
-            addWriter(sink, inputStream, writeResultTypeInformation);
-
-        precommitted =
-            adjustTransformations(
-                writerResult, preCommittingSink::addPreCommitTopology, true, 
false);
-      } else {
-        precommitted = addWriter(sink, inputStream, 
committableTypeInformation);
-      }
-
-      DataStream<CommittableMessage<CommT>> committed =
-          adjustTransformations(
-              precommitted,
-              pc ->
-                  pc.transform(
-                      COMMITTER_NAME,
-                      committableTypeInformation,
-                      new CommitterOperatorFactory<>(
-                          committingSink, isBatchMode, 
isCheckpointingEnabled)),
-              false,
-              false);
-
-      if (sink instanceof SupportsPostCommitTopology) {
-        DataStream<CommittableMessage<CommT>> postcommitted = 
addFailOverRegion(committed);
-        adjustTransformations(
-            postcommitted,
-            pc -> {
-              ((SupportsPostCommitTopology<CommT>) 
sink).addPostCommitTopology(pc);
-              return null;
-            },
-            true,
-            false);
-      }
-    }
-
-    private <WriteResultT> DataStream<CommittableMessage<WriteResultT>> 
addWriter(
-        Sink<T> sink,
-        DataStream<T> inputStream,
-        TypeInformation<CommittableMessage<WriteResultT>> typeInformation) {
-      DataStream<CommittableMessage<WriteResultT>> written =
-          adjustTransformations(
-              inputStream,
-              input ->
-                  input.transform(
-                      WRITER_NAME, typeInformation, new 
SinkWriterOperatorFactory<>(sink)),
-              false,
-              sink instanceof SupportsConcurrentExecutionAttempts);
-
-      return addFailOverRegion(written);
-    }
-
-    /** Adds a batch exchange that materializes the output first. This is a 
no-op in STREAMING. */
-    private <I> DataStream<I> addFailOverRegion(DataStream<I> input) {
-      return new DataStream<>(
-          executionEnvironment,
-          new PartitionTransformation<>(
-              input.getTransformation(), new ForwardPartitioner<>(), 
StreamExchangeMode.BATCH));
-    }
-
-    /**
-     * Since user may set specific parallelism on sub topologies, we have to 
pay attention to the
-     * priority of parallelism at different levels, i.e. sub topologies 
customized parallelism >
-     * sinkTransformation customized parallelism > environment customized 
parallelism. In order to
-     * satisfy this rule and keep these customized parallelism values, the 
environment parallelism
-     * will be set to be {@link ExecutionConfig#PARALLELISM_DEFAULT} before 
adjusting
-     * transformations. SubTransformations, constructed after that, will have 
either the default
-     * value or customized value. In this way, any customized value will be 
discriminated from the
-     * default value and, for any subTransformation with the default 
parallelism value, we will then
-     * be able to let it inherit the parallelism value from the previous 
sinkTransformation. After
-     * the adjustment of transformations is closed, the environment 
parallelism will be restored
-     * back to its original value to keep the customized parallelism value at 
environment level.
-     */
-    private <I, R> R adjustTransformations(
-        DataStream<I> inputStream,
-        Function<DataStream<I>, R> action,
-        boolean isExpandedTopology,
-        boolean supportsConcurrentExecutionAttempts) {
-
-      // Reset the environment parallelism temporarily before adjusting 
transformations,
-      // we can therefore be aware of any customized parallelism of the sub 
topology
-      // set by users during the adjustment.
-      executionEnvironment.setParallelism(ExecutionConfig.PARALLELISM_DEFAULT);
-
-      int numTransformsBefore = 
executionEnvironment.getTransformations().size();
-      R result = action.apply(inputStream);
-      List<Transformation<?>> transformations = 
executionEnvironment.getTransformations();
-      List<Transformation<?>> expandedTransformations =
-          transformations.subList(numTransformsBefore, transformations.size());
-
-      final CustomSinkOperatorUidHashes operatorsUidHashes =
-          transformation.getSinkOperatorsUidHashes();
-      for (Transformation<?> subTransformation : expandedTransformations) {
-
-        String subUid = subTransformation.getUid();
-        if (isExpandedTopology && subUid != null && !subUid.isEmpty()) {
-          checkState(
-              transformation.getUid() != null && 
!transformation.getUid().isEmpty(),
-              "Sink "
-                  + transformation.getName()
-                  + " requires to set a uid since its customized topology"
-                  + " has set uid for some operators.");
-        }
-
-        // Set the operator uid hashes to support stateful upgrades without 
prior uids
-        setOperatorUidHashIfPossible(
-            subTransformation, WRITER_NAME, 
operatorsUidHashes.getWriterUidHash());
-        setOperatorUidHashIfPossible(
-            subTransformation, COMMITTER_NAME, 
operatorsUidHashes.getCommitterUidHash());
-        setOperatorUidHashIfPossible(
-            subTransformation,
-            StandardSinkTopologies.GLOBAL_COMMITTER_TRANSFORMATION_NAME,
-            operatorsUidHashes.getGlobalCommitterUidHash());
-
-        concatUid(
-            subTransformation,
-            Transformation::getUid,
-            Transformation::setUid,
-            subTransformation.getName());
-
-        concatProperty(
-            subTransformation,
-            Transformation::getCoLocationGroupKey,
-            Transformation::setCoLocationGroupKey);
-
-        concatProperty(subTransformation, Transformation::getName, 
Transformation::setName);
-
-        concatProperty(
-            subTransformation, Transformation::getDescription, 
Transformation::setDescription);
-
-        // handle coLocationGroupKey.
-        String coLocationGroupKey = transformation.getCoLocationGroupKey();
-        if (coLocationGroupKey != null && 
subTransformation.getCoLocationGroupKey() == null) {
-          subTransformation.setCoLocationGroupKey(coLocationGroupKey);
-        }
-
-        Optional<SlotSharingGroup> ssg = transformation.getSlotSharingGroup();
-
-        if (ssg.isPresent() && 
!subTransformation.getSlotSharingGroup().isPresent()) {
-          subTransformation.setSlotSharingGroup(ssg.get());
-        }
-
-        // remember that the environment parallelism has been set to be default
-        // at the beginning. SubTransformations, whose parallelism has been
-        // customized, will skip this part. The customized parallelism value 
set by user
-        // will therefore be kept.
-        if (subTransformation.getParallelism() == 
ExecutionConfig.PARALLELISM_DEFAULT) {
-          // In this case, the subTransformation does not contain any 
customized
-          // parallelism value and will therefore inherit the parallelism value
-          // from the sinkTransformation.
-          subTransformation.setParallelism(
-              transformation.getParallelism(), 
transformation.isParallelismConfigured());
-        }
-
-        if (subTransformation.getMaxParallelism() < 0 && 
transformation.getMaxParallelism() > 0) {
-          
subTransformation.setMaxParallelism(transformation.getMaxParallelism());
-        }
-
-        if (subTransformation instanceof PhysicalTransformation) {
-          PhysicalTransformation<?> physicalSubTransformation =
-              (PhysicalTransformation<?>) subTransformation;
-
-          if (transformation.getChainingStrategy() != null) {
-            
physicalSubTransformation.setChainingStrategy(transformation.getChainingStrategy());
-          }
-
-          // overrides the supportsConcurrentExecutionAttempts of 
transformation because
-          // it's not allowed to specify fine-grained concurrent execution 
attempts yet
-          physicalSubTransformation.setSupportsConcurrentExecutionAttempts(
-              supportsConcurrentExecutionAttempts);
-        }
-      }
-
-      // Restore the previous parallelism of the environment before adjusting 
transformations
-      if (environmentParallelism.isPresent()) {
-        
executionEnvironment.getConfig().setParallelism(environmentParallelism.get());
-      } else {
-        executionEnvironment.getConfig().resetParallelism();
-      }
-
-      return result;
-    }
-
-    private void setOperatorUidHashIfPossible(
-        Transformation<?> transformation, String writerName, @Nullable String 
operatorUidHash) {
-      if (operatorUidHash == null || 
!transformation.getName().equals(writerName)) {
-        return;
-      }
-      transformation.setUidHash(operatorUidHash);
-    }
-
-    private void concatUid(
-        Transformation<?> subTransformation,
-        Function<Transformation<?>, String> getter,
-        BiConsumer<Transformation<?>, String> setter,
-        @Nullable String transformationName) {
-      if (transformationName != null && getter.apply(transformation) != null) {
-        // Use the same uid pattern than for Sink V1. We deliberately decided 
to use the uid
-        // pattern of Flink 1.13 because 1.14 did not have a dedicated 
committer operator.
-        if (transformationName.equals(COMMITTER_NAME)) {
-          final String committerFormat = "Sink Committer: %s";
-          setter.accept(
-              subTransformation, String.format(committerFormat, 
getter.apply(transformation)));
-          return;
-        }
-        // Set the writer operator uid to the sinks uid to support state 
migrations
-        if (transformationName.equals(WRITER_NAME)) {
-          setter.accept(subTransformation, getter.apply(transformation));
-          return;
-        }
-
-        // Use the same uid pattern than for Sink V1 in Flink 1.14.
-        if (transformationName.equals(
-            StandardSinkTopologies.GLOBAL_COMMITTER_TRANSFORMATION_NAME)) {
-          final String committerFormat = "Sink %s Global Committer";
-          setter.accept(
-              subTransformation, String.format(committerFormat, 
getter.apply(transformation)));
-          return;
-        }
-      }
-      concatProperty(subTransformation, getter, setter);
-    }
-
-    private void concatProperty(
-        Transformation<?> subTransformation,
-        Function<Transformation<?>, String> getter,
-        BiConsumer<Transformation<?>, String> setter) {
-      if (getter.apply(transformation) != null && 
getter.apply(subTransformation) != null) {
-        setter.accept(
-            subTransformation,
-            getter.apply(transformation) + ": " + 
getter.apply(subTransformation));
-      }
-    }
-  }
-}
diff --git 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/streaming/api/operators/GlutenStreamSource.java
 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/streaming/api/operators/GlutenStreamSource.java
index 7ffe20e7df..0281def229 100644
--- 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/streaming/api/operators/GlutenStreamSource.java
+++ 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/streaming/api/operators/GlutenStreamSource.java
@@ -17,11 +17,15 @@
 package org.apache.gluten.streaming.api.operators;
 
 import org.apache.gluten.table.runtime.operators.GlutenVectorSourceFunction;
+import org.apache.gluten.util.ReflectUtils;
+import org.apache.gluten.util.Utils;
 
 import io.github.zhztheplayer.velox4j.connector.ConnectorSplit;
 import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode;
 import io.github.zhztheplayer.velox4j.type.RowType;
 
+import org.apache.flink.api.common.TaskInfo;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.operators.StreamSource;
 
 import java.util.Map;
@@ -63,4 +67,35 @@ public class GlutenStreamSource extends StreamSource 
implements GlutenOperator {
   public ConnectorSplit getConnectorSplit() {
     return sourceFunction.getConnectorSplit();
   }
+
+  @SuppressWarnings("rawtypes")
+  private SourceFunction.SourceContext getSourceContext() {
+    return (SourceFunction.SourceContext)
+        ReflectUtils.getObjectField(StreamSource.class, this, "ctx");
+  }
+
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  @Override
+  public void notifyCheckpointComplete(long checkpointId) throws Exception {
+    super.notifyCheckpointComplete(checkpointId);
+    String[] committed = sourceFunction.notifyCheckpointComplete(checkpointId);
+    SourceFunction.SourceContext sourceContext = getSourceContext();
+    TaskInfo taskInfo = getRuntimeContext().getTaskInfo();
+    if (sourceContext != null
+        && committed != null
+        && taskInfo.getTaskName().contains("StreamingFileWriter")) {
+      sourceContext.collect(
+          Utils.constructCommitInfo(
+              checkpointId,
+              taskInfo.getIndexOfThisSubtask(),
+              taskInfo.getNumberOfParallelSubtasks(),
+              committed));
+    }
+  }
+
+  @Override
+  public void notifyCheckpointAborted(long checkpointId) throws Exception {
+    super.notifyCheckpointAborted(checkpointId);
+    sourceFunction.notifyCheckpointAborted(checkpointId);
+  }
 }
diff --git 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/config/VeloxConnectorConfig.java
 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/config/VeloxConnectorConfig.java
new file mode 100644
index 0000000000..13b195b0bd
--- /dev/null
+++ 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/config/VeloxConnectorConfig.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.table.runtime.config;
+
+import io.github.zhztheplayer.velox4j.config.Config;
+import io.github.zhztheplayer.velox4j.config.ConnectorConfig;
+
+import org.apache.flink.api.common.TaskInfo;
+import org.apache.flink.api.common.functions.RuntimeContext;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+public class VeloxConnectorConfig {
+
+  private static final List<String> CONNECTORS =
+      List.of(
+          "connector-nexmark",
+          "connector-kafka",
+          "connector-fuzzer",
+          "connector-filesystem",
+          "connector-from-elements",
+          "connector-print");
+  private static final String keyTaskIndex = "task_index";
+  private static final String keyQueryUUId = "query_uuid";
+
+  public static ConnectorConfig getConfig(RuntimeContext context) {
+    Map<String, String> configMap = new HashMap<>();
+    TaskInfo taskInfo = context.getTaskInfo();
+    configMap.put(keyTaskIndex, 
String.valueOf(taskInfo.getIndexOfThisSubtask()));
+    configMap.put(
+        keyQueryUUId,
+        
UUID.nameUUIDFromBytes(context.getJobInfo().getJobId().toHexString().getBytes())
+            .toString());
+    Config commonConfig = Config.create(configMap);
+    Map<String, Config> connectorConfigMap = new HashMap<>();
+    for (String connectorId : CONNECTORS) {
+      connectorConfigMap.put(connectorId, commonConfig);
+    }
+    return ConnectorConfig.create(connectorConfigMap);
+  }
+}
diff --git 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java
 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java
index d8f6f1a0b2..09af2f4576 100644
--- 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java
+++ 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java
@@ -17,11 +17,11 @@
 package org.apache.gluten.table.runtime.operators;
 
 import org.apache.gluten.streaming.api.operators.GlutenOperator;
+import org.apache.gluten.table.runtime.config.VeloxConnectorConfig;
 import org.apache.gluten.table.runtime.config.VeloxQueryConfig;
 import org.apache.gluten.vectorized.FlinkRowToVLVectorConvertor;
 
 import io.github.zhztheplayer.velox4j.Velox4j;
-import io.github.zhztheplayer.velox4j.config.ConnectorConfig;
 import io.github.zhztheplayer.velox4j.connector.ExternalStreamConnectorSplit;
 import io.github.zhztheplayer.velox4j.connector.ExternalStreamTableHandle;
 import io.github.zhztheplayer.velox4j.connector.ExternalStreams;
@@ -101,7 +101,9 @@ public class GlutenOneInputOperator extends 
TableStreamOperator<RowData>
     LOG.debug("OutTypes: {}", outputTypes.keySet());
     query =
         new Query(
-            mockInput, VeloxQueryConfig.getConfig(getRuntimeContext()), 
ConnectorConfig.empty());
+            mockInput,
+            VeloxQueryConfig.getConfig(getRuntimeContext()),
+            VeloxConnectorConfig.getConfig(getRuntimeContext()));
     allocator = new RootAllocator(Long.MAX_VALUE);
     task = session.queryOps().execute(query);
     ExternalStreamConnectorSplit split =
diff --git 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java
 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java
index 360eb39a84..54c33f5159 100644
--- 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java
+++ 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java
@@ -16,11 +16,11 @@
  */
 package org.apache.gluten.table.runtime.operators;
 
+import org.apache.gluten.table.runtime.config.VeloxConnectorConfig;
 import org.apache.gluten.table.runtime.config.VeloxQueryConfig;
 import org.apache.gluten.vectorized.FlinkRowToVLVectorConvertor;
 
 import io.github.zhztheplayer.velox4j.Velox4j;
-import io.github.zhztheplayer.velox4j.config.ConnectorConfig;
 import io.github.zhztheplayer.velox4j.connector.ConnectorSplit;
 import io.github.zhztheplayer.velox4j.data.RowVector;
 import io.github.zhztheplayer.velox4j.iterator.UpIterator;
@@ -94,7 +94,9 @@ public class GlutenSourceFunction extends 
RichParallelSourceFunction<RowData> {
     session = Velox4j.newSession(memoryManager);
     query =
         new Query(
-            planNode, VeloxQueryConfig.getConfig(getRuntimeContext()), 
ConnectorConfig.empty());
+            planNode,
+            VeloxQueryConfig.getConfig(getRuntimeContext()),
+            VeloxConnectorConfig.getConfig(getRuntimeContext()));
     allocator = new RootAllocator(Long.MAX_VALUE);
 
     SerialTask task = session.queryOps().execute(query);
diff --git 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorOneInputOperator.java
 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorOneInputOperator.java
index e859e15ca3..b55ebb6868 100644
--- 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorOneInputOperator.java
+++ 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorOneInputOperator.java
@@ -17,10 +17,10 @@
 package org.apache.gluten.table.runtime.operators;
 
 import org.apache.gluten.streaming.api.operators.GlutenOperator;
+import org.apache.gluten.table.runtime.config.VeloxConnectorConfig;
 import org.apache.gluten.table.runtime.config.VeloxQueryConfig;
 
 import io.github.zhztheplayer.velox4j.Velox4j;
-import io.github.zhztheplayer.velox4j.config.ConnectorConfig;
 import io.github.zhztheplayer.velox4j.connector.ExternalStreamConnectorSplit;
 import io.github.zhztheplayer.velox4j.connector.ExternalStreamTableHandle;
 import io.github.zhztheplayer.velox4j.connector.ExternalStreams;
@@ -69,7 +69,7 @@ public class GlutenVectorOneInputOperator extends 
TableStreamOperator<StatefulRe
   private Session session;
   private Query query;
   private ExternalStreams.BlockingQueue inputQueue;
-  private SerialTask task;
+  protected SerialTask task;
 
   public GlutenVectorOneInputOperator(
       StatefulPlanNode plan, String id, RowType inputType, Map<String, 
RowType> outputTypes) {
@@ -96,7 +96,9 @@ public class GlutenVectorOneInputOperator extends 
TableStreamOperator<StatefulRe
     LOG.debug("OutTypes: {}", outputTypes.keySet());
     query =
         new Query(
-            mockInput, VeloxQueryConfig.getConfig(getRuntimeContext()), 
ConnectorConfig.empty());
+            mockInput,
+            VeloxQueryConfig.getConfig(getRuntimeContext()),
+            VeloxConnectorConfig.getConfig(getRuntimeContext()));
     task = session.queryOps().execute(query);
   }
 
diff --git 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java
 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java
index 472bd0bfed..67bc802946 100644
--- 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java
+++ 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java
@@ -16,11 +16,11 @@
  */
 package org.apache.gluten.table.runtime.operators;
 
+import org.apache.gluten.table.runtime.config.VeloxConnectorConfig;
 import org.apache.gluten.table.runtime.config.VeloxQueryConfig;
 import org.apache.gluten.table.runtime.metrics.SourceTaskMetrics;
 
 import io.github.zhztheplayer.velox4j.Velox4j;
-import io.github.zhztheplayer.velox4j.config.ConnectorConfig;
 import io.github.zhztheplayer.velox4j.connector.ConnectorSplit;
 import io.github.zhztheplayer.velox4j.iterator.UpIterator;
 import io.github.zhztheplayer.velox4j.memory.AllocationListener;
@@ -33,7 +33,6 @@ import io.github.zhztheplayer.velox4j.session.Session;
 import io.github.zhztheplayer.velox4j.stateful.StatefulElement;
 import io.github.zhztheplayer.velox4j.type.RowType;
 
-import org.apache.flink.api.common.state.CheckpointListener;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
@@ -53,7 +52,7 @@ import java.util.Map;
  * instead of RowData to avoid data convert.
  */
 public class GlutenVectorSourceFunction extends 
RichParallelSourceFunction<StatefulElement>
-    implements CheckpointedFunction, CheckpointListener {
+    implements CheckpointedFunction {
   private static final Logger LOG = 
LoggerFactory.getLogger(GlutenVectorSourceFunction.class);
 
   private final StatefulPlanNode planNode;
@@ -103,7 +102,9 @@ public class GlutenVectorSourceFunction extends 
RichParallelSourceFunction<State
       session = Velox4j.newSession(memoryManager);
       query =
           new Query(
-              planNode, VeloxQueryConfig.getConfig(getRuntimeContext()), 
ConnectorConfig.empty());
+              planNode,
+              VeloxQueryConfig.getConfig(getRuntimeContext()),
+              VeloxConnectorConfig.getConfig(getRuntimeContext()));
       allocator = new RootAllocator(Long.MAX_VALUE);
 
       task = session.queryOps().execute(query);
@@ -159,7 +160,9 @@ public class GlutenVectorSourceFunction extends 
RichParallelSourceFunction<State
       session = Velox4j.newSession(memoryManager);
       query =
           new Query(
-              planNode, VeloxQueryConfig.getConfig(getRuntimeContext()), 
ConnectorConfig.empty());
+              planNode,
+              VeloxQueryConfig.getConfig(getRuntimeContext()),
+              VeloxConnectorConfig.getConfig(getRuntimeContext()));
       allocator = new RootAllocator(Long.MAX_VALUE);
 
       task = session.queryOps().execute(query);
@@ -170,13 +173,11 @@ public class GlutenVectorSourceFunction extends 
RichParallelSourceFunction<State
     this.task.initializeState(0);
   }
 
-  @Override
-  public void notifyCheckpointComplete(long checkpointId) throws Exception {
+  public String[] notifyCheckpointComplete(long checkpointId) throws Exception 
{
     // TODO: notify velox
-    this.task.notifyCheckpointComplete(checkpointId);
+    return this.task.notifyCheckpointComplete(checkpointId);
   }
 
-  @Override
   public void notifyCheckpointAborted(long checkpointId) throws Exception {
     // TODO: notify velox
     this.task.notifyCheckpointAborted(checkpointId);
diff --git 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorTwoInputOperator.java
 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorTwoInputOperator.java
index a5c4da5f0c..4fbfe8c7ad 100644
--- 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorTwoInputOperator.java
+++ 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorTwoInputOperator.java
@@ -17,10 +17,10 @@
 package org.apache.gluten.table.runtime.operators;
 
 import org.apache.gluten.streaming.api.operators.GlutenOperator;
+import org.apache.gluten.table.runtime.config.VeloxConnectorConfig;
 import org.apache.gluten.table.runtime.config.VeloxQueryConfig;
 
 import io.github.zhztheplayer.velox4j.Velox4j;
-import io.github.zhztheplayer.velox4j.config.ConnectorConfig;
 import io.github.zhztheplayer.velox4j.connector.ExternalStreamConnectorSplit;
 import io.github.zhztheplayer.velox4j.connector.ExternalStreams;
 import io.github.zhztheplayer.velox4j.data.RowVector;
@@ -96,7 +96,9 @@ public class GlutenVectorTwoInputOperator extends 
AbstractStreamOperator<Statefu
     session = Velox4j.newSession(memoryManager);
     query =
         new Query(
-            glutenPlan, VeloxQueryConfig.getConfig(getRuntimeContext()), 
ConnectorConfig.empty());
+            glutenPlan,
+            VeloxQueryConfig.getConfig(getRuntimeContext()),
+            VeloxConnectorConfig.getConfig(getRuntimeContext()));
     task = session.queryOps().execute(query);
     LOG.debug("Gluten Plan: {}", Serde.toJson(glutenPlan));
     LOG.debug("OutTypes: {}", outputTypes.keySet());
diff --git 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/util/ReflectUtils.java 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/util/ReflectUtils.java
index 74c0208066..7dff789ae2 100644
--- 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/util/ReflectUtils.java
+++ 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/util/ReflectUtils.java
@@ -33,6 +33,15 @@ public class ReflectUtils {
     }
   }
 
+  public static Object getObjectField(String className, Object obj, String 
fieldName) {
+    try {
+      Class<?> clazz = Class.forName(className);
+      return getObjectField(clazz, obj, fieldName);
+    } catch (ClassNotFoundException e) {
+      throw new FlinkRuntimeException(e);
+    }
+  }
+
   public static Object invokeObjectMethod(
       Class<?> clazz, Object obj, String methodName, Class<?>[] paramTypes, 
Object[] paramValues) {
     try {
diff --git 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/util/Utils.java 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/util/Utils.java
index 1f8d46b6e9..d580a520af 100644
--- a/gluten-flink/runtime/src/main/java/org/apache/gluten/util/Utils.java
+++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/util/Utils.java
@@ -16,12 +16,14 @@
  */
 package org.apache.gluten.util;
 
+import org.apache.flink.connector.file.table.stream.PartitionCommitInfo;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.runtime.tasks.StreamTaskException;
 import org.apache.flink.util.InstantiationUtil;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.Map;
 
 /** Utils to add and get some infos to StreamConfig. */
@@ -66,4 +68,11 @@ public class Utils {
           String.format("Could not serialize object for key %s.", key), e);
     }
   }
+
+  public static Serializable constructCommitInfo(
+      long id, int subtaskIndex, int numberOfSubtasks, String[] committed) {
+    PartitionCommitInfo commitInfo =
+        new PartitionCommitInfo(id, subtaskIndex, numberOfSubtasks, committed);
+    return commitInfo;
+  }
 }
diff --git a/gluten-flink/ut/pom.xml b/gluten-flink/ut/pom.xml
index 74311055a6..252fbf94a4 100644
--- a/gluten-flink/ut/pom.xml
+++ b/gluten-flink/ut/pom.xml
@@ -176,12 +176,24 @@
       <version>${flink.version}</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-connector-files</artifactId>
+      <version>${flink.version}</version>
+      <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>org.apache.flink</groupId>
       <artifactId>flink-json</artifactId>
       <version>${flink.version}</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-csv</artifactId>
+      <version>${flink.version}</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
diff --git a/gluten-flink/ut/src/test/resources/nexmark/q10.sql 
b/gluten-flink/ut/src/test/resources/nexmark/q10.sql
new file mode 100644
index 0000000000..6d13d7c85f
--- /dev/null
+++ b/gluten-flink/ut/src/test/resources/nexmark/q10.sql
@@ -0,0 +1,23 @@
+CREATE TABLE nexmark_q10 (
+  auction  BIGINT,
+  bidder  BIGINT,
+  price  BIGINT,
+  `dateTime`  TIMESTAMP(3),
+  extra  VARCHAR,
+  dt STRING,
+  hm STRING
+) PARTITIONED BY (dt, hm) WITH (
+  'connector' = 'filesystem',
+  'path' = 'file:///tmp/data/output/bid/',
+  'format' = 'csv',
+  'sink.partition-commit.trigger' = 'partition-time',
+  'sink.partition-commit.delay' = '1 min',
+  'sink.partition-commit.policy.kind' = 'success-file',
+  'partition.time-extractor.timestamp-pattern' = '$dt $hm:00',
+  'sink.rolling-policy.rollover-interval' = '1min',
+  'sink.rolling-policy.check-interval' = '1min'
+);
+
+INSERT INTO nexmark_q10
+SELECT auction, bidder, price, `dateTime`, extra, DATE_FORMAT(`dateTime`, 
'yyyy-MM-dd'), DATE_FORMAT(`dateTime`, 'HH:mm')
+FROM bid;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to