This is an automated email from the ASF dual-hosted git repository.
lgbo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 4feac9bba3 [GLUTEN-10064][FLINK] Support filesystem sink (#11300)
4feac9bba3 is described below
commit 4feac9bba366e032c7ad50fccc603d6fe3c29f8f
Author: kevinyhzou <[email protected]>
AuthorDate: Thu Dec 25 10:33:50 2025 +0800
[GLUTEN-10064][FLINK] Support filesystem sink (#11300)
* support filesytstem sink
* fix reviews
---
.github/workflows/flink.yml | 2 +-
gluten-flink/docs/Flink.md | 2 +-
.../plan/nodes/exec/common/CommonExecSink.java | 38 +-
.../apache/gluten/velox/FileSystemSinkFactory.java | 161 ++++++
.../gluten/velox/FuzzerSourceSinkFactory.java | 149 ++++++
.../org.apache.gluten.velox.VeloxSourceSinkFactory | 2 +
gluten-flink/runtime/pom.xml | 6 +
.../LegacySourceTransformationTranslator.java | 137 ------
.../translators/SinkTransformationTranslator.java | 543 ---------------------
.../api/operators/GlutenStreamSource.java | 35 ++
.../table/runtime/config/VeloxConnectorConfig.java | 58 +++
.../runtime/operators/GlutenOneInputOperator.java | 6 +-
.../runtime/operators/GlutenSourceFunction.java | 6 +-
.../operators/GlutenVectorOneInputOperator.java | 8 +-
.../operators/GlutenVectorSourceFunction.java | 19 +-
.../operators/GlutenVectorTwoInputOperator.java | 6 +-
.../java/org/apache/gluten/util/ReflectUtils.java | 9 +
.../main/java/org/apache/gluten/util/Utils.java | 9 +
gluten-flink/ut/pom.xml | 12 +
gluten-flink/ut/src/test/resources/nexmark/q10.sql | 23 +
20 files changed, 514 insertions(+), 717 deletions(-)
diff --git a/.github/workflows/flink.yml b/.github/workflows/flink.yml
index 099ee73438..aefa159f9b 100644
--- a/.github/workflows/flink.yml
+++ b/.github/workflows/flink.yml
@@ -61,7 +61,7 @@ jobs:
sudo yum install
https://mirror.stream.centos.org/9-stream/BaseOS/x86_64/os/Packages/tzdata-2025a-1.el9.noarch.rpm
-y
sudo .github/workflows/util/install-flink-resources.sh
git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git
- cd velox4j && git reset --hard
1753fa68f71d8a1a0df2d4a0ff346ae00e973e9c
+ cd velox4j && git reset --hard
430d149837b9311df2787797fe0e0ec9abefd688
git apply $GITHUB_WORKSPACE/gluten-flink/patches/fix-velox4j.patch
mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true
cd ..
diff --git a/gluten-flink/docs/Flink.md b/gluten-flink/docs/Flink.md
index a73b56bb32..8424ccc190 100644
--- a/gluten-flink/docs/Flink.md
+++ b/gluten-flink/docs/Flink.md
@@ -48,7 +48,7 @@ As some features have not been committed to upstream, you
have to use the follow
## fetch velox4j code
git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git
cd velox4j
-git reset --hard 1753fa68f71d8a1a0df2d4a0ff346ae00e973e9c
+git reset --hard 430d149837b9311df2787797fe0e0ec9abefd688
mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true
```
**Get gluten**
diff --git
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
index a1b0e0e7b7..8dbac05783 100644
---
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
+++
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
@@ -207,16 +207,25 @@ public abstract class CommonExecSink extends
ExecNodeBase<Object>
if (targetRowKind.isPresent()) {
sinkTransform = applyRowKindSetter(sinkTransform, targetRowKind.get(),
config);
}
-
- return (Transformation<Object>)
- applySinkProvider(
- sinkTransform,
- streamExecEnv,
- runtimeProvider,
- rowtimeFieldIndex,
- sinkParallelism,
- config,
- classLoader);
+ // --- Begin Gluten-specific code changes ---
+ Transformation<Object> transformation =
+ (Transformation<Object>)
+ applySinkProvider(
+ sinkTransform,
+ streamExecEnv,
+ runtimeProvider,
+ rowtimeFieldIndex,
+ sinkParallelism,
+ config,
+ classLoader);
+ return VeloxSourceSinkFactory.buildSink(
+ (Transformation) transformation,
+ Map.of(
+ Configuration.class.getName(),
+ streamExecEnv.getConfiguration(),
+ ResolvedSchema.class.getName(),
+ schema));
+ // --- End Gluten-specific code changes ---
}
/** Apply an operator to filter or report error to process not-null values
for not-null fields. */
@@ -467,13 +476,8 @@ public abstract class CommonExecSink extends
ExecNodeBase<Object>
} else if (runtimeProvider instanceof SinkFunctionProvider) {
final SinkFunction<RowData> sinkFunction =
((SinkFunctionProvider) runtimeProvider).createSinkFunction();
- // --- Begin Gluten-specific code changes ---
- Transformation sinkTransformation =
- createSinkFunctionTransformation(
- sinkFunction, env, inputTransform, rowtimeFieldIndex,
sinkMeta, sinkParallelism);
- return VeloxSourceSinkFactory.buildSink(
- sinkTransformation, Map.of(Configuration.class.getName(),
env.getConfiguration()));
- // --- End Gluten-specific code changes ---
+ return createSinkFunctionTransformation(
+ sinkFunction, env, inputTransform, rowtimeFieldIndex, sinkMeta,
sinkParallelism);
} else if (runtimeProvider instanceof OutputFormatProvider) {
OutputFormat<RowData> outputFormat =
((OutputFormatProvider) runtimeProvider).createOutputFormat();
diff --git
a/gluten-flink/planner/src/main/java/org/apache/gluten/velox/FileSystemSinkFactory.java
b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/FileSystemSinkFactory.java
new file mode 100644
index 0000000000..657e321762
--- /dev/null
+++
b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/FileSystemSinkFactory.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.velox;
+
+import org.apache.gluten.streaming.api.operators.GlutenOneInputOperatorFactory;
+import org.apache.gluten.table.runtime.operators.GlutenOneInputOperator;
+import org.apache.gluten.util.LogicalTypeConverter;
+import org.apache.gluten.util.PlanNodeIdGenerator;
+import org.apache.gluten.util.ReflectUtils;
+
+import io.github.zhztheplayer.velox4j.connector.CommitStrategy;
+import io.github.zhztheplayer.velox4j.connector.FileSystemInsertTableHandle;
+import io.github.zhztheplayer.velox4j.plan.EmptyNode;
+import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode;
+import io.github.zhztheplayer.velox4j.plan.TableWriteNode;
+import io.github.zhztheplayer.velox4j.type.BigIntType;
+import io.github.zhztheplayer.velox4j.type.RowType;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.api.transformations.SinkTransformation;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class FileSystemSinkFactory implements VeloxSourceSinkFactory {
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public boolean match(Transformation<RowData> transformation) {
+ if (transformation instanceof SinkTransformation) {
+ SinkTransformation<RowData, RowData> sinkTransformation =
+ (SinkTransformation<RowData, RowData>) transformation;
+ Transformation<RowData> inputTransformation =
+ (Transformation<RowData>) sinkTransformation.getInputs().get(0);
+ if (inputTransformation instanceof OneInputTransformation
+ && inputTransformation.getName().equals("PartitionCommitter")) {
+ OneInputTransformation<RowData, RowData> oneInputTransformatin =
+ (OneInputTransformation<RowData, RowData>) inputTransformation;
+ Transformation<RowData> preInputTransformation =
+ (Transformation<RowData>) oneInputTransformatin.getInputs().get(0);
+ return preInputTransformation.getName().equals("StreamingFileWriter");
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public Transformation<RowData> buildVeloxSource(
+ Transformation<RowData> transformation, Map<String, Object> parameters) {
+ throw new UnsupportedOperationException("Unimplemented method
'buildVeloxSource'");
+ }
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ @Override
+ public Transformation<RowData> buildVeloxSink(
+ Transformation<RowData> transformation, Map<String, Object> parameters) {
+ SinkTransformation<RowData, RowData> sinkTransformation =
+ (SinkTransformation<RowData, RowData>) transformation;
+ OneInputTransformation<RowData, RowData> partitionCommitTransformation =
+ (OneInputTransformation<RowData, RowData>)
sinkTransformation.getInputs().get(0);
+ OneInputTransformation<RowData, RowData> fileWriterTransformation =
+ (OneInputTransformation<RowData, RowData>)
partitionCommitTransformation.getInputs().get(0);
+ OneInputStreamOperator<?, ?> operator =
fileWriterTransformation.getOperator();
+ List<String> partitionKeys =
+ (List<String>) ReflectUtils.getObjectField(operator.getClass(),
operator, "partitionKeys");
+ Map<String, String> tableParams = new HashMap<>();
+ Configuration tableOptions =
+ (Configuration)
+ ReflectUtils.getObjectField(
+
"org.apache.flink.connector.file.table.stream.PartitionCommitter",
+ partitionCommitTransformation.getOperator(),
+ "conf");
+ tableParams.putAll(tableOptions.toMap());
+
+ ResolvedSchema schema = (ResolvedSchema)
parameters.get(ResolvedSchema.class.getName());
+ List<String> columnList = schema.getColumnNames();
+ List<Integer> partitionIndexes =
+
partitionKeys.stream().mapToInt(columnList::indexOf).boxed().collect(Collectors.toList());
+ org.apache.flink.table.types.logical.RowType inputType =
+ (org.apache.flink.table.types.logical.RowType)
+ ((InternalTypeInfo<?>)
fileWriterTransformation.getInputType()).toLogicalType();
+ RowType inputDataColumns = (RowType)
LogicalTypeConverter.toVLType(inputType);
+ FileSystemInsertTableHandle insertTableHandle =
+ new FileSystemInsertTableHandle(
+ fileWriterTransformation.getName(),
+ inputDataColumns,
+ partitionKeys,
+ partitionIndexes,
+ tableParams);
+ RowType ignore = new RowType(List.of("num"), List.of(new BigIntType()));
+ TableWriteNode fileSystemWriteNode =
+ new TableWriteNode(
+ PlanNodeIdGenerator.newId(),
+ inputDataColumns,
+ inputDataColumns.getNames(),
+ null,
+ "connector-filesystem",
+ insertTableHandle,
+ false,
+ ignore,
+ CommitStrategy.NO_COMMIT,
+ List.of(new EmptyNode(inputDataColumns)));
+ GlutenOneInputOperator onewInputOperator =
+ new GlutenOneInputOperator(
+ new StatefulPlanNode(fileSystemWriteNode.getId(),
fileSystemWriteNode),
+ PlanNodeIdGenerator.newId(),
+ inputDataColumns,
+ Map.of(fileSystemWriteNode.getId(), ignore));
+ GlutenOneInputOperatorFactory<?, ?> operatorFactory =
+ new GlutenOneInputOperatorFactory(onewInputOperator);
+ Transformation<RowData> veloxFileWriterTransformation =
+ new OneInputTransformation(
+ fileWriterTransformation.getInputs().get(0),
+ fileWriterTransformation.getName(),
+ operatorFactory,
+ fileWriterTransformation.getOutputType(),
+ fileWriterTransformation.getParallelism());
+ OneInputTransformation<RowData, RowData> newPartitionCommitTransformation =
+ new OneInputTransformation(
+ veloxFileWriterTransformation,
+ partitionCommitTransformation.getName(),
+ partitionCommitTransformation.getOperatorFactory(),
+ partitionCommitTransformation.getOutputType(),
+ partitionCommitTransformation.getParallelism());
+ DataStream<RowData> newInputStream =
+ new DataStream<RowData>(
+ sinkTransformation.getInputStream().getExecutionEnvironment(),
+ newPartitionCommitTransformation);
+ return new SinkTransformation<RowData, RowData>(
+ newInputStream,
+ sinkTransformation.getSink(),
+ sinkTransformation.getOutputType(),
+ sinkTransformation.getName(),
+ sinkTransformation.getParallelism(),
+ sinkTransformation.isParallelismConfigured(),
+ sinkTransformation.getSinkOperatorsUidHashes());
+ }
+}
diff --git
a/gluten-flink/planner/src/main/java/org/apache/gluten/velox/FuzzerSourceSinkFactory.java
b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/FuzzerSourceSinkFactory.java
new file mode 100644
index 0000000000..16ac7f0831
--- /dev/null
+++
b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/FuzzerSourceSinkFactory.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.velox;
+
+import org.apache.gluten.streaming.api.operators.GlutenOneInputOperatorFactory;
+import org.apache.gluten.streaming.api.operators.GlutenStreamSource;
+import org.apache.gluten.table.runtime.operators.GlutenVectorOneInputOperator;
+import org.apache.gluten.table.runtime.operators.GlutenVectorSourceFunction;
+import org.apache.gluten.util.LogicalTypeConverter;
+import org.apache.gluten.util.PlanNodeIdGenerator;
+
+import io.github.zhztheplayer.velox4j.connector.CommitStrategy;
+import io.github.zhztheplayer.velox4j.connector.DiscardDataTableHandle;
+import io.github.zhztheplayer.velox4j.connector.FuzzerConnectorSplit;
+import io.github.zhztheplayer.velox4j.connector.FuzzerTableHandle;
+import io.github.zhztheplayer.velox4j.plan.EmptyNode;
+import io.github.zhztheplayer.velox4j.plan.PlanNode;
+import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode;
+import io.github.zhztheplayer.velox4j.plan.TableScanNode;
+import io.github.zhztheplayer.velox4j.plan.TableWriteNode;
+import io.github.zhztheplayer.velox4j.type.BigIntType;
+import io.github.zhztheplayer.velox4j.type.RowType;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.dag.Transformation;
+import
org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
+import
org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource;
+import
org.apache.flink.streaming.api.transformations.LegacySourceTransformation;
+import org.apache.flink.streaming.api.transformations.SinkTransformation;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+
+import java.util.List;
+import java.util.Map;
+
+public class FuzzerSourceSinkFactory implements VeloxSourceSinkFactory {
+
+ @SuppressWarnings({"unchecked"})
+ @Override
+ public boolean match(Transformation<RowData> transformation) {
+ if (transformation instanceof SinkTransformation) {
+ SinkTransformation<RowData, RowData> sinkTransformation =
+ (SinkTransformation<RowData, RowData>) transformation;
+ Transformation<?> inputTransformation =
sinkTransformation.getInputs().get(0);
+ if (sinkTransformation.getSink() instanceof DiscardingSink
+ && !inputTransformation.getName().equals("PartitionCommitter")) {
+ return true;
+ }
+ } else if (transformation instanceof LegacySourceTransformation) {
+ Function userFunction =
+ ((LegacySourceTransformation<RowData>)
transformation).getOperator().getUserFunction();
+ return userFunction instanceof DataGeneratorSource;
+ }
+ return false;
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ @Override
+ public Transformation<RowData> buildVeloxSource(
+ Transformation<RowData> transformation, Map<String, Object> parameters) {
+ LegacySourceTransformation<RowData> sourceTransformation =
+ (LegacySourceTransformation<RowData>) transformation;
+ RowType outputType =
+ (RowType)
+ LogicalTypeConverter.toVLType(
+ ((InternalTypeInfo)
sourceTransformation.getOutputType()).toLogicalType());
+ String id = PlanNodeIdGenerator.newId();
+ /// Create fuzzer table handle by 2 parameters: (1) a string as connector
id; (2) a int as seed
+ FuzzerTableHandle tableHandle = new FuzzerTableHandle("connector-fuzzer",
12367);
+ PlanNode tableScan = new TableScanNode(id, outputType, tableHandle,
List.of());
+ GlutenStreamSource sourceOp =
+ new GlutenStreamSource(
+ new GlutenVectorSourceFunction(
+ new StatefulPlanNode(id, tableScan),
+ Map.of(id, outputType),
+ id,
+ new FuzzerConnectorSplit("connector-fuzzer", 1000)));
+ return new LegacySourceTransformation<RowData>(
+ sourceTransformation.getName(),
+ sourceOp,
+ sourceTransformation.getOutputType(),
+ sourceTransformation.getParallelism(),
+ sourceTransformation.getBoundedness(),
+ false);
+ }
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ @Override
+ public Transformation<RowData> buildVeloxSink(
+ Transformation<RowData> transformation, Map<String, Object> parameters) {
+ SinkTransformation<RowData, RowData> sinkTransformation =
+ (SinkTransformation<RowData, RowData>) transformation;
+ RowType outputType =
+ (RowType)
+ LogicalTypeConverter.toVLType(
+ ((InternalTypeInfo)
transformation.getOutputType()).toLogicalType());
+ // TODO: this is a constrain of velox.
+ // The result type should be ignored, as the data is written by velox,
+ // and no result need to return.
+ RowType ignore = new RowType(List.of("num"), List.of(new BigIntType()));
+ PlanNode plan =
+ new TableWriteNode(
+ PlanNodeIdGenerator.newId(),
+ outputType,
+ outputType.getNames(),
+ null,
+ "connector-fuzzer",
+ new DiscardDataTableHandle(),
+ false,
+ ignore,
+ CommitStrategy.NO_COMMIT,
+ List.of(new EmptyNode(outputType)));
+ GlutenOneInputOperatorFactory operatorFactory =
+ new GlutenOneInputOperatorFactory(
+ new GlutenVectorOneInputOperator(
+ new StatefulPlanNode(plan.getId(), plan),
+ PlanNodeIdGenerator.newId(),
+ outputType,
+ Map.of(plan.getId(), ignore)));
+ DataStream<RowData> newInputStream =
+ sinkTransformation
+ .getInputStream()
+ .transform("Writer", CommittableMessageTypeInfo.noOutput(),
operatorFactory);
+ return new SinkTransformation<RowData, RowData>(
+ newInputStream,
+ sinkTransformation.getSink(),
+ sinkTransformation.getOutputType(),
+ sinkTransformation.getName(),
+ sinkTransformation.getParallelism(),
+ sinkTransformation.isParallelismConfigured(),
+ sinkTransformation.getSinkOperatorsUidHashes());
+ }
+}
diff --git
a/gluten-flink/planner/src/main/resources/META-INF/services/org.apache.gluten.velox.VeloxSourceSinkFactory
b/gluten-flink/planner/src/main/resources/META-INF/services/org.apache.gluten.velox.VeloxSourceSinkFactory
index 9d7623b7ec..b2d824ee12 100644
---
a/gluten-flink/planner/src/main/resources/META-INF/services/org.apache.gluten.velox.VeloxSourceSinkFactory
+++
b/gluten-flink/planner/src/main/resources/META-INF/services/org.apache.gluten.velox.VeloxSourceSinkFactory
@@ -2,3 +2,5 @@ org.apache.gluten.velox.FromElementsSourceFactory
org.apache.gluten.velox.KafkaSourceSinkFactory
org.apache.gluten.velox.PrintSinkFactory
org.apache.gluten.velox.NexmarkSourceFactory
+org.apache.gluten.velox.FileSystemSinkFactory
+org.apache.gluten.velox.FuzzerSourceSinkFactory
diff --git a/gluten-flink/runtime/pom.xml b/gluten-flink/runtime/pom.xml
index 41e6cd414e..017bbf7736 100644
--- a/gluten-flink/runtime/pom.xml
+++ b/gluten-flink/runtime/pom.xml
@@ -57,6 +57,12 @@
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-files</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
</dependencies>
<build>
diff --git
a/gluten-flink/runtime/src/main/java/org/apache/flink/streaming/runtime/translators/LegacySourceTransformationTranslator.java
b/gluten-flink/runtime/src/main/java/org/apache/flink/streaming/runtime/translators/LegacySourceTransformationTranslator.java
deleted file mode 100644
index cba9057b24..0000000000
---
a/gluten-flink/runtime/src/main/java/org/apache/flink/streaming/runtime/translators/LegacySourceTransformationTranslator.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.runtime.translators;
-
-import org.apache.gluten.streaming.api.operators.GlutenStreamSource;
-import org.apache.gluten.table.runtime.operators.GlutenVectorSourceFunction;
-import org.apache.gluten.util.LogicalTypeConverter;
-import org.apache.gluten.util.PlanNodeIdGenerator;
-
-import io.github.zhztheplayer.velox4j.connector.FuzzerConnectorSplit;
-import io.github.zhztheplayer.velox4j.connector.FuzzerTableHandle;
-import io.github.zhztheplayer.velox4j.plan.PlanNode;
-import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode;
-import io.github.zhztheplayer.velox4j.plan.TableScanNode;
-import io.github.zhztheplayer.velox4j.type.RowType;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.Function;
-import
org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource;
-import org.apache.flink.streaming.api.graph.SimpleTransformationTranslator;
-import org.apache.flink.streaming.api.graph.StreamGraph;
-import org.apache.flink.streaming.api.graph.TransformationTranslator;
-import org.apache.flink.streaming.api.operators.InputFormatOperatorFactory;
-import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
-import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
-import
org.apache.flink.streaming.api.transformations.LegacySourceTransformation;
-import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Gluten {@link TransformationTranslator} for the {@link
LegacySourceTransformation}.
- *
- * @param <OUT> The type of the elements that the {@link
LegacySourceTransformation} we are
- * translating is producing.
- */
-@Internal
-public class LegacySourceTransformationTranslator<OUT>
- extends SimpleTransformationTranslator<OUT,
LegacySourceTransformation<OUT>> {
-
- @Override
- protected Collection<Integer> translateForBatchInternal(
- final LegacySourceTransformation<OUT> transformation, final Context
context) {
- return translateInternal(transformation, context);
- }
-
- @Override
- protected Collection<Integer> translateForStreamingInternal(
- final LegacySourceTransformation<OUT> transformation, final Context
context) {
- return translateInternal(transformation, context);
- }
-
- private Collection<Integer> translateInternal(
- final LegacySourceTransformation<OUT> transformation, final Context
context) {
- checkNotNull(transformation);
- checkNotNull(context);
-
- final StreamGraph streamGraph = context.getStreamGraph();
- final String slotSharingGroup = context.getSlotSharingGroup();
- final int transformationId = transformation.getId();
- final ExecutionConfig executionConfig = streamGraph.getExecutionConfig();
-
- Function userFunction = transformation.getOperator().getUserFunction();
- StreamOperatorFactory<OUT> operatorFactory;
- String namePrefix = "";
- // --- Begin Gluten-specific code changes ---
- if (userFunction instanceof DataGeneratorSource) {
- RowType outputType =
- (RowType)
- LogicalTypeConverter.toVLType(
- ((InternalTypeInfo)
transformation.getOutputType()).toLogicalType());
- String id = PlanNodeIdGenerator.newId();
- PlanNode tableScan =
- new TableScanNode(
- id, outputType, new FuzzerTableHandle("connector-fuzzer",
12367), List.of());
- operatorFactory =
- SimpleOperatorFactory.of(
- new GlutenStreamSource(
- new GlutenVectorSourceFunction(
- new StatefulPlanNode(id, tableScan),
- Map.of(id, outputType),
- id,
- new FuzzerConnectorSplit("connector-fuzzer", 1000))));
- namePrefix = "Gluten ";
- } else {
- operatorFactory = transformation.getOperatorFactory();
- }
- // --- End Gluten-specific code changes ---
- streamGraph.addLegacySource(
- transformationId,
- slotSharingGroup,
- transformation.getCoLocationGroupKey(),
- operatorFactory,
- null,
- transformation.getOutputType(),
- namePrefix + "Source: " + transformation.getName());
-
- if (transformation.getOperatorFactory() instanceof
InputFormatOperatorFactory) {
- streamGraph.setInputFormat(
- transformationId,
- ((InputFormatOperatorFactory<OUT>)
transformation.getOperatorFactory()).getInputFormat());
- }
-
- final int parallelism =
- transformation.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT
- ? transformation.getParallelism()
- : executionConfig.getParallelism();
- streamGraph.setParallelism(
- transformationId, parallelism,
transformation.isParallelismConfigured());
- streamGraph.setMaxParallelism(transformationId,
transformation.getMaxParallelism());
-
- streamGraph.setSupportsConcurrentExecutionAttempts(
- transformationId,
transformation.isSupportsConcurrentExecutionAttempts());
-
- return Collections.singleton(transformationId);
- }
-}
diff --git
a/gluten-flink/runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
b/gluten-flink/runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
deleted file mode 100644
index 2bebc93af7..0000000000
---
a/gluten-flink/runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
+++ /dev/null
@@ -1,543 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.runtime.translators;
-
-import org.apache.gluten.streaming.api.operators.GlutenOneInputOperatorFactory;
-import org.apache.gluten.table.runtime.operators.GlutenVectorOneInputOperator;
-import org.apache.gluten.util.LogicalTypeConverter;
-import org.apache.gluten.util.PlanNodeIdGenerator;
-
-import io.github.zhztheplayer.velox4j.connector.CommitStrategy;
-import io.github.zhztheplayer.velox4j.connector.DiscardDataTableHandle;
-import io.github.zhztheplayer.velox4j.plan.EmptyNode;
-import io.github.zhztheplayer.velox4j.plan.PlanNode;
-import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode;
-import io.github.zhztheplayer.velox4j.plan.TableWriteNode;
-import io.github.zhztheplayer.velox4j.type.BigIntType;
-import io.github.zhztheplayer.velox4j.type.RowType;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.SupportsConcurrentExecutionAttempts;
-import org.apache.flink.api.common.operators.SlotSharingGroup;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.connector.sink2.Sink;
-import org.apache.flink.api.connector.sink2.SupportsCommitter;
-import org.apache.flink.api.dag.Transformation;
-import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
-import
org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
-import org.apache.flink.streaming.api.connector.sink2.StandardSinkTopologies;
-import
org.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology;
-import
org.apache.flink.streaming.api.connector.sink2.SupportsPreCommitTopology;
-import org.apache.flink.streaming.api.connector.sink2.SupportsPreWriteTopology;
-import org.apache.flink.streaming.api.datastream.CustomSinkOperatorUidHashes;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
-import org.apache.flink.streaming.api.graph.TransformationTranslator;
-import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
-import org.apache.flink.streaming.api.transformations.OneInputTransformation;
-import org.apache.flink.streaming.api.transformations.PartitionTransformation;
-import org.apache.flink.streaming.api.transformations.PhysicalTransformation;
-import org.apache.flink.streaming.api.transformations.SinkTransformation;
-import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
-import
org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory;
-import
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory;
-import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
-import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
-import org.apache.flink.util.Preconditions;
-
-import javax.annotation.Nullable;
-
-import java.util.ArrayDeque;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Queue;
-import java.util.Set;
-import java.util.function.BiConsumer;
-import java.util.function.Function;
-
-import static org.apache.flink.util.Preconditions.checkState;
-
-/**
- * A {@link org.apache.flink.streaming.api.graph.TransformationTranslator} for
the {@link
- * org.apache.flink.streaming.api.transformations.SinkTransformation}.
- */
-@Internal
-public class SinkTransformationTranslator<Input, Output>
- implements TransformationTranslator<Output, SinkTransformation<Input,
Output>> {
-
- private static final String COMMITTER_NAME = "Committer";
- private static final String WRITER_NAME = "Writer";
-
- @Override
- public Collection<Integer> translateForBatch(
- SinkTransformation<Input, Output> transformation, Context context) {
- return translateInternal(transformation, context, true);
- }
-
- @Override
- public Collection<Integer> translateForStreaming(
- SinkTransformation<Input, Output> transformation, Context context) {
- return translateInternal(transformation, context, false);
- }
-
- private Collection<Integer> translateInternal(
- SinkTransformation<Input, Output> transformation, Context context,
boolean batch) {
- SinkExpander<Input> expander =
- new SinkExpander<>(
- transformation.getInputStream(),
- transformation.getSink(),
- transformation,
- context,
- batch);
- expander.expand();
- return Collections.emptyList();
- }
-
- /**
- * Expands the FLIP-143 Sink to a subtopology. Each part of the topology is
created after the
- * previous part of the topology has been completely configured by the user.
For example, if a
- * user explicitly sets the parallelism of the sink, each part of the
subtopology can rely on the
- * input having that parallelism.
- */
- private static class SinkExpander<T> {
- private final SinkTransformation<T, ?> transformation;
- private final Sink<T> sink;
- private final Context context;
- private final DataStream<T> inputStream;
- private final StreamExecutionEnvironment executionEnvironment;
- private final Optional<Integer> environmentParallelism;
- private final boolean isBatchMode;
- private final boolean isCheckpointingEnabled;
-
- public SinkExpander(
- DataStream<T> inputStream,
- Sink<T> sink,
- SinkTransformation<T, ?> transformation,
- Context context,
- boolean isBatchMode) {
- this.inputStream = inputStream;
- this.executionEnvironment = inputStream.getExecutionEnvironment();
- this.environmentParallelism =
- executionEnvironment
- .getConfig()
- .toConfiguration()
- .getOptional(CoreOptions.DEFAULT_PARALLELISM);
- this.isCheckpointingEnabled =
- executionEnvironment.getCheckpointConfig().isCheckpointingEnabled();
- this.transformation = transformation;
- this.sink = sink;
- this.context = context;
- this.isBatchMode = isBatchMode;
- }
-
- private void expand() {
-
- final int sizeBefore = executionEnvironment.getTransformations().size();
-
- DataStream<T> prewritten = inputStream;
-
- if (sink instanceof SupportsPreWriteTopology) {
- prewritten =
- adjustTransformations(
- prewritten,
- ((SupportsPreWriteTopology<T>) sink)::addPreWriteTopology,
- true,
- sink instanceof SupportsConcurrentExecutionAttempts);
- }
-
- if (sink instanceof SupportsPreCommitTopology) {
- Preconditions.checkArgument(
- sink instanceof SupportsCommitter,
- "Sink with SupportsPreCommitTopology should implement
SupportsCommitter");
- }
- if (sink instanceof SupportsPostCommitTopology) {
- Preconditions.checkArgument(
- sink instanceof SupportsCommitter,
- "Sink with SupportsPostCommitTopology should implement
SupportsCommitter");
- }
-
- if (sink instanceof SupportsCommitter) {
- addCommittingTopology(sink, prewritten);
- } else {
- // --- Begin Gluten-specific code changes ---
- if (sink instanceof DiscardingSink) {
- RowType outputType =
- (RowType)
- LogicalTypeConverter.toVLType(
- ((InternalTypeInfo)
transformation.getOutputType()).toLogicalType());
- // TODO: this is a constrain of velox.
- // The result type should be ignored, as the data is written by
velox,
- // and no result need to return.
- RowType ignore = new RowType(List.of("num"), List.of(new
BigIntType()));
- PlanNode plan =
- new TableWriteNode(
- PlanNodeIdGenerator.newId(),
- outputType,
- outputType.getNames(),
- null,
- "connector-fuzzer",
- new DiscardDataTableHandle(),
- false,
- ignore,
- CommitStrategy.NO_COMMIT,
- List.of(new EmptyNode(outputType)));
- adjustTransformations(
- prewritten,
- input ->
- input.transform(
- WRITER_NAME,
- CommittableMessageTypeInfo.noOutput(),
- new GlutenOneInputOperatorFactory(
- new GlutenVectorOneInputOperator(
- new StatefulPlanNode(plan.getId(), plan),
- PlanNodeIdGenerator.newId(),
- outputType,
- Map.of(plan.getId(), ignore)))),
- false,
- sink instanceof SupportsConcurrentExecutionAttempts);
- } else {
- adjustTransformations(
- prewritten,
- input ->
- input.transform(
- WRITER_NAME,
- CommittableMessageTypeInfo.noOutput(),
- new SinkWriterOperatorFactory<>(sink)),
- false,
- sink instanceof SupportsConcurrentExecutionAttempts);
- }
- // --- End Gluten-specific code changes ---
- }
-
- getSinkTransformations(sizeBefore).forEach(context::transform);
-
- disallowUnalignedCheckpoint(getSinkTransformations(sizeBefore));
-
- // Remove all added sink subtransformations to avoid duplications and
allow additional
- // expansions
- while (executionEnvironment.getTransformations().size() > sizeBefore) {
- executionEnvironment
- .getTransformations()
- .remove(executionEnvironment.getTransformations().size() - 1);
- }
- }
-
- private List<Transformation<?>> getSinkTransformations(int sizeBefore) {
- return executionEnvironment
- .getTransformations()
- .subList(sizeBefore,
executionEnvironment.getTransformations().size());
- }
-
- /**
- * Disables UC for all connections of operators within the sink expansion.
This is necessary
- * because committables need to be at the respective operators on
notifyCheckpointComplete or
- * else we can't commit all side-effects, which violates the contract of
- * notifyCheckpointComplete.
- */
- private void disallowUnalignedCheckpoint(List<Transformation<?>>
sinkTransformations) {
- Optional<Transformation<?>> writerOpt =
-
sinkTransformations.stream().filter(SinkExpander::isWriter).findFirst();
- Preconditions.checkState(writerOpt.isPresent(), "Writer transformation
not found.");
- Transformation<?> writer = writerOpt.get();
- int indexOfWriter = sinkTransformations.indexOf(writer);
-
- // check all transformation after the writer and recursively disable UC
for all inputs
- // up to the writer
- Set<Integer> seen = new HashSet<>(writer.getId());
- Queue<Transformation<?>> pending =
- new ArrayDeque<>(
- sinkTransformations.subList(indexOfWriter + 1,
sinkTransformations.size()));
-
- while (!pending.isEmpty()) {
- Transformation<?> current = pending.poll();
- seen.add(current.getId());
-
- for (Transformation<?> input : current.getInputs()) {
- if (input instanceof PartitionTransformation) {
- ((PartitionTransformation<?>)
input).getPartitioner().disableUnalignedCheckpoints();
- }
- if (seen.add(input.getId())) {
- pending.add(input);
- }
- }
- }
- }
-
- private static boolean isWriter(Transformation<?> t) {
- if (!(t instanceof OneInputTransformation)) {
- return false;
- }
- StreamOperatorFactory operatorFactory =
- ((OneInputTransformation<?, ?>) t).getOperatorFactory();
- return operatorFactory instanceof SinkWriterOperatorFactory
- || operatorFactory instanceof GlutenOneInputOperatorFactory;
- }
-
- private <CommT, WriteResultT> void addCommittingTopology(
- Sink<T> sink, DataStream<T> inputStream) {
- SupportsCommitter<CommT> committingSink = (SupportsCommitter<CommT>)
sink;
- TypeInformation<CommittableMessage<CommT>> committableTypeInformation =
-
CommittableMessageTypeInfo.of(committingSink::getCommittableSerializer);
-
- DataStream<CommittableMessage<CommT>> precommitted;
- if (sink instanceof SupportsPreCommitTopology) {
- SupportsPreCommitTopology<WriteResultT, CommT> preCommittingSink =
- (SupportsPreCommitTopology<WriteResultT, CommT>) sink;
- TypeInformation<CommittableMessage<WriteResultT>>
writeResultTypeInformation =
-
CommittableMessageTypeInfo.of(preCommittingSink::getWriteResultSerializer);
-
- DataStream<CommittableMessage<WriteResultT>> writerResult =
- addWriter(sink, inputStream, writeResultTypeInformation);
-
- precommitted =
- adjustTransformations(
- writerResult, preCommittingSink::addPreCommitTopology, true,
false);
- } else {
- precommitted = addWriter(sink, inputStream,
committableTypeInformation);
- }
-
- DataStream<CommittableMessage<CommT>> committed =
- adjustTransformations(
- precommitted,
- pc ->
- pc.transform(
- COMMITTER_NAME,
- committableTypeInformation,
- new CommitterOperatorFactory<>(
- committingSink, isBatchMode,
isCheckpointingEnabled)),
- false,
- false);
-
- if (sink instanceof SupportsPostCommitTopology) {
- DataStream<CommittableMessage<CommT>> postcommitted =
addFailOverRegion(committed);
- adjustTransformations(
- postcommitted,
- pc -> {
- ((SupportsPostCommitTopology<CommT>)
sink).addPostCommitTopology(pc);
- return null;
- },
- true,
- false);
- }
- }
-
- private <WriteResultT> DataStream<CommittableMessage<WriteResultT>>
addWriter(
- Sink<T> sink,
- DataStream<T> inputStream,
- TypeInformation<CommittableMessage<WriteResultT>> typeInformation) {
- DataStream<CommittableMessage<WriteResultT>> written =
- adjustTransformations(
- inputStream,
- input ->
- input.transform(
- WRITER_NAME, typeInformation, new
SinkWriterOperatorFactory<>(sink)),
- false,
- sink instanceof SupportsConcurrentExecutionAttempts);
-
- return addFailOverRegion(written);
- }
-
- /** Adds a batch exchange that materializes the output first. This is a
no-op in STREAMING. */
- private <I> DataStream<I> addFailOverRegion(DataStream<I> input) {
- return new DataStream<>(
- executionEnvironment,
- new PartitionTransformation<>(
- input.getTransformation(), new ForwardPartitioner<>(),
StreamExchangeMode.BATCH));
- }
-
- /**
- * Since user may set specific parallelism on sub topologies, we have to
pay attention to the
- * priority of parallelism at different levels, i.e. sub topologies
customized parallelism >
- * sinkTransformation customized parallelism > environment customized
parallelism. In order to
- * satisfy this rule and keep these customized parallelism values, the
environment parallelism
- * will be set to be {@link ExecutionConfig#PARALLELISM_DEFAULT} before
adjusting
- * transformations. SubTransformations, constructed after that, will have
either the default
- * value or customized value. In this way, any customized value will be
discriminated from the
- * default value and, for any subTransformation with the default
parallelism value, we will then
- * be able to let it inherit the parallelism value from the previous
sinkTransformation. After
- * the adjustment of transformations is closed, the environment
parallelism will be restored
- * back to its original value to keep the customized parallelism value at
environment level.
- */
- private <I, R> R adjustTransformations(
- DataStream<I> inputStream,
- Function<DataStream<I>, R> action,
- boolean isExpandedTopology,
- boolean supportsConcurrentExecutionAttempts) {
-
- // Reset the environment parallelism temporarily before adjusting
transformations,
- // we can therefore be aware of any customized parallelism of the sub
topology
- // set by users during the adjustment.
- executionEnvironment.setParallelism(ExecutionConfig.PARALLELISM_DEFAULT);
-
- int numTransformsBefore =
executionEnvironment.getTransformations().size();
- R result = action.apply(inputStream);
- List<Transformation<?>> transformations =
executionEnvironment.getTransformations();
- List<Transformation<?>> expandedTransformations =
- transformations.subList(numTransformsBefore, transformations.size());
-
- final CustomSinkOperatorUidHashes operatorsUidHashes =
- transformation.getSinkOperatorsUidHashes();
- for (Transformation<?> subTransformation : expandedTransformations) {
-
- String subUid = subTransformation.getUid();
- if (isExpandedTopology && subUid != null && !subUid.isEmpty()) {
- checkState(
- transformation.getUid() != null &&
!transformation.getUid().isEmpty(),
- "Sink "
- + transformation.getName()
- + " requires to set a uid since its customized topology"
- + " has set uid for some operators.");
- }
-
- // Set the operator uid hashes to support stateful upgrades without
prior uids
- setOperatorUidHashIfPossible(
- subTransformation, WRITER_NAME,
operatorsUidHashes.getWriterUidHash());
- setOperatorUidHashIfPossible(
- subTransformation, COMMITTER_NAME,
operatorsUidHashes.getCommitterUidHash());
- setOperatorUidHashIfPossible(
- subTransformation,
- StandardSinkTopologies.GLOBAL_COMMITTER_TRANSFORMATION_NAME,
- operatorsUidHashes.getGlobalCommitterUidHash());
-
- concatUid(
- subTransformation,
- Transformation::getUid,
- Transformation::setUid,
- subTransformation.getName());
-
- concatProperty(
- subTransformation,
- Transformation::getCoLocationGroupKey,
- Transformation::setCoLocationGroupKey);
-
- concatProperty(subTransformation, Transformation::getName,
Transformation::setName);
-
- concatProperty(
- subTransformation, Transformation::getDescription,
Transformation::setDescription);
-
- // handle coLocationGroupKey.
- String coLocationGroupKey = transformation.getCoLocationGroupKey();
- if (coLocationGroupKey != null &&
subTransformation.getCoLocationGroupKey() == null) {
- subTransformation.setCoLocationGroupKey(coLocationGroupKey);
- }
-
- Optional<SlotSharingGroup> ssg = transformation.getSlotSharingGroup();
-
- if (ssg.isPresent() &&
!subTransformation.getSlotSharingGroup().isPresent()) {
- subTransformation.setSlotSharingGroup(ssg.get());
- }
-
- // remember that the environment parallelism has been set to be default
- // at the beginning. SubTransformations, whose parallelism has been
- // customized, will skip this part. The customized parallelism value
set by user
- // will therefore be kept.
- if (subTransformation.getParallelism() ==
ExecutionConfig.PARALLELISM_DEFAULT) {
- // In this case, the subTransformation does not contain any
customized
- // parallelism value and will therefore inherit the parallelism value
- // from the sinkTransformation.
- subTransformation.setParallelism(
- transformation.getParallelism(),
transformation.isParallelismConfigured());
- }
-
- if (subTransformation.getMaxParallelism() < 0 &&
transformation.getMaxParallelism() > 0) {
-
subTransformation.setMaxParallelism(transformation.getMaxParallelism());
- }
-
- if (subTransformation instanceof PhysicalTransformation) {
- PhysicalTransformation<?> physicalSubTransformation =
- (PhysicalTransformation<?>) subTransformation;
-
- if (transformation.getChainingStrategy() != null) {
-
physicalSubTransformation.setChainingStrategy(transformation.getChainingStrategy());
- }
-
- // overrides the supportsConcurrentExecutionAttempts of
transformation because
- // it's not allowed to specify fine-grained concurrent execution
attempts yet
- physicalSubTransformation.setSupportsConcurrentExecutionAttempts(
- supportsConcurrentExecutionAttempts);
- }
- }
-
- // Restore the previous parallelism of the environment before adjusting
transformations
- if (environmentParallelism.isPresent()) {
-
executionEnvironment.getConfig().setParallelism(environmentParallelism.get());
- } else {
- executionEnvironment.getConfig().resetParallelism();
- }
-
- return result;
- }
-
- private void setOperatorUidHashIfPossible(
- Transformation<?> transformation, String writerName, @Nullable String
operatorUidHash) {
- if (operatorUidHash == null ||
!transformation.getName().equals(writerName)) {
- return;
- }
- transformation.setUidHash(operatorUidHash);
- }
-
- private void concatUid(
- Transformation<?> subTransformation,
- Function<Transformation<?>, String> getter,
- BiConsumer<Transformation<?>, String> setter,
- @Nullable String transformationName) {
- if (transformationName != null && getter.apply(transformation) != null) {
- // Use the same uid pattern than for Sink V1. We deliberately decided
to use the uid
- // pattern of Flink 1.13 because 1.14 did not have a dedicated
committer operator.
- if (transformationName.equals(COMMITTER_NAME)) {
- final String committerFormat = "Sink Committer: %s";
- setter.accept(
- subTransformation, String.format(committerFormat,
getter.apply(transformation)));
- return;
- }
- // Set the writer operator uid to the sinks uid to support state
migrations
- if (transformationName.equals(WRITER_NAME)) {
- setter.accept(subTransformation, getter.apply(transformation));
- return;
- }
-
- // Use the same uid pattern than for Sink V1 in Flink 1.14.
- if (transformationName.equals(
- StandardSinkTopologies.GLOBAL_COMMITTER_TRANSFORMATION_NAME)) {
- final String committerFormat = "Sink %s Global Committer";
- setter.accept(
- subTransformation, String.format(committerFormat,
getter.apply(transformation)));
- return;
- }
- }
- concatProperty(subTransformation, getter, setter);
- }
-
- private void concatProperty(
- Transformation<?> subTransformation,
- Function<Transformation<?>, String> getter,
- BiConsumer<Transformation<?>, String> setter) {
- if (getter.apply(transformation) != null &&
getter.apply(subTransformation) != null) {
- setter.accept(
- subTransformation,
- getter.apply(transformation) + ": " +
getter.apply(subTransformation));
- }
- }
- }
-}
diff --git
a/gluten-flink/runtime/src/main/java/org/apache/gluten/streaming/api/operators/GlutenStreamSource.java
b/gluten-flink/runtime/src/main/java/org/apache/gluten/streaming/api/operators/GlutenStreamSource.java
index 7ffe20e7df..0281def229 100644
---
a/gluten-flink/runtime/src/main/java/org/apache/gluten/streaming/api/operators/GlutenStreamSource.java
+++
b/gluten-flink/runtime/src/main/java/org/apache/gluten/streaming/api/operators/GlutenStreamSource.java
@@ -17,11 +17,15 @@
package org.apache.gluten.streaming.api.operators;
import org.apache.gluten.table.runtime.operators.GlutenVectorSourceFunction;
+import org.apache.gluten.util.ReflectUtils;
+import org.apache.gluten.util.Utils;
import io.github.zhztheplayer.velox4j.connector.ConnectorSplit;
import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode;
import io.github.zhztheplayer.velox4j.type.RowType;
+import org.apache.flink.api.common.TaskInfo;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamSource;
import java.util.Map;
@@ -63,4 +67,35 @@ public class GlutenStreamSource extends StreamSource
implements GlutenOperator {
public ConnectorSplit getConnectorSplit() {
return sourceFunction.getConnectorSplit();
}
+
+ @SuppressWarnings("rawtypes")
+ private SourceFunction.SourceContext getSourceContext() {
+ return (SourceFunction.SourceContext)
+ ReflectUtils.getObjectField(StreamSource.class, this, "ctx");
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ super.notifyCheckpointComplete(checkpointId);
+ String[] committed = sourceFunction.notifyCheckpointComplete(checkpointId);
+ SourceFunction.SourceContext sourceContext = getSourceContext();
+ TaskInfo taskInfo = getRuntimeContext().getTaskInfo();
+ if (sourceContext != null
+ && committed != null
+ && taskInfo.getTaskName().contains("StreamingFileWriter")) {
+ sourceContext.collect(
+ Utils.constructCommitInfo(
+ checkpointId,
+ taskInfo.getIndexOfThisSubtask(),
+ taskInfo.getNumberOfParallelSubtasks(),
+ committed));
+ }
+ }
+
+ @Override
+ public void notifyCheckpointAborted(long checkpointId) throws Exception {
+ super.notifyCheckpointAborted(checkpointId);
+ sourceFunction.notifyCheckpointAborted(checkpointId);
+ }
}
diff --git
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/config/VeloxConnectorConfig.java
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/config/VeloxConnectorConfig.java
new file mode 100644
index 0000000000..13b195b0bd
--- /dev/null
+++
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/config/VeloxConnectorConfig.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.table.runtime.config;
+
+import io.github.zhztheplayer.velox4j.config.Config;
+import io.github.zhztheplayer.velox4j.config.ConnectorConfig;
+
+import org.apache.flink.api.common.TaskInfo;
+import org.apache.flink.api.common.functions.RuntimeContext;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+public class VeloxConnectorConfig {
+
+ private static final List<String> CONNECTORS =
+ List.of(
+ "connector-nexmark",
+ "connector-kafka",
+ "connector-fuzzer",
+ "connector-filesystem",
+ "connector-from-elements",
+ "connector-print");
+ private static final String keyTaskIndex = "task_index";
+ private static final String keyQueryUUId = "query_uuid";
+
+ public static ConnectorConfig getConfig(RuntimeContext context) {
+ Map<String, String> configMap = new HashMap<>();
+ TaskInfo taskInfo = context.getTaskInfo();
+ configMap.put(keyTaskIndex,
String.valueOf(taskInfo.getIndexOfThisSubtask()));
+ configMap.put(
+ keyQueryUUId,
+
UUID.nameUUIDFromBytes(context.getJobInfo().getJobId().toHexString().getBytes())
+ .toString());
+ Config commonConfig = Config.create(configMap);
+ Map<String, Config> connectorConfigMap = new HashMap<>();
+ for (String connectorId : CONNECTORS) {
+ connectorConfigMap.put(connectorId, commonConfig);
+ }
+ return ConnectorConfig.create(connectorConfigMap);
+ }
+}
diff --git
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java
index d8f6f1a0b2..09af2f4576 100644
---
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java
+++
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java
@@ -17,11 +17,11 @@
package org.apache.gluten.table.runtime.operators;
import org.apache.gluten.streaming.api.operators.GlutenOperator;
+import org.apache.gluten.table.runtime.config.VeloxConnectorConfig;
import org.apache.gluten.table.runtime.config.VeloxQueryConfig;
import org.apache.gluten.vectorized.FlinkRowToVLVectorConvertor;
import io.github.zhztheplayer.velox4j.Velox4j;
-import io.github.zhztheplayer.velox4j.config.ConnectorConfig;
import io.github.zhztheplayer.velox4j.connector.ExternalStreamConnectorSplit;
import io.github.zhztheplayer.velox4j.connector.ExternalStreamTableHandle;
import io.github.zhztheplayer.velox4j.connector.ExternalStreams;
@@ -101,7 +101,9 @@ public class GlutenOneInputOperator extends
TableStreamOperator<RowData>
LOG.debug("OutTypes: {}", outputTypes.keySet());
query =
new Query(
- mockInput, VeloxQueryConfig.getConfig(getRuntimeContext()),
ConnectorConfig.empty());
+ mockInput,
+ VeloxQueryConfig.getConfig(getRuntimeContext()),
+ VeloxConnectorConfig.getConfig(getRuntimeContext()));
allocator = new RootAllocator(Long.MAX_VALUE);
task = session.queryOps().execute(query);
ExternalStreamConnectorSplit split =
diff --git
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java
index 360eb39a84..54c33f5159 100644
---
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java
+++
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java
@@ -16,11 +16,11 @@
*/
package org.apache.gluten.table.runtime.operators;
+import org.apache.gluten.table.runtime.config.VeloxConnectorConfig;
import org.apache.gluten.table.runtime.config.VeloxQueryConfig;
import org.apache.gluten.vectorized.FlinkRowToVLVectorConvertor;
import io.github.zhztheplayer.velox4j.Velox4j;
-import io.github.zhztheplayer.velox4j.config.ConnectorConfig;
import io.github.zhztheplayer.velox4j.connector.ConnectorSplit;
import io.github.zhztheplayer.velox4j.data.RowVector;
import io.github.zhztheplayer.velox4j.iterator.UpIterator;
@@ -94,7 +94,9 @@ public class GlutenSourceFunction extends
RichParallelSourceFunction<RowData> {
session = Velox4j.newSession(memoryManager);
query =
new Query(
- planNode, VeloxQueryConfig.getConfig(getRuntimeContext()),
ConnectorConfig.empty());
+ planNode,
+ VeloxQueryConfig.getConfig(getRuntimeContext()),
+ VeloxConnectorConfig.getConfig(getRuntimeContext()));
allocator = new RootAllocator(Long.MAX_VALUE);
SerialTask task = session.queryOps().execute(query);
diff --git
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorOneInputOperator.java
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorOneInputOperator.java
index e859e15ca3..b55ebb6868 100644
---
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorOneInputOperator.java
+++
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorOneInputOperator.java
@@ -17,10 +17,10 @@
package org.apache.gluten.table.runtime.operators;
import org.apache.gluten.streaming.api.operators.GlutenOperator;
+import org.apache.gluten.table.runtime.config.VeloxConnectorConfig;
import org.apache.gluten.table.runtime.config.VeloxQueryConfig;
import io.github.zhztheplayer.velox4j.Velox4j;
-import io.github.zhztheplayer.velox4j.config.ConnectorConfig;
import io.github.zhztheplayer.velox4j.connector.ExternalStreamConnectorSplit;
import io.github.zhztheplayer.velox4j.connector.ExternalStreamTableHandle;
import io.github.zhztheplayer.velox4j.connector.ExternalStreams;
@@ -69,7 +69,7 @@ public class GlutenVectorOneInputOperator extends
TableStreamOperator<StatefulRe
private Session session;
private Query query;
private ExternalStreams.BlockingQueue inputQueue;
- private SerialTask task;
+ protected SerialTask task;
public GlutenVectorOneInputOperator(
StatefulPlanNode plan, String id, RowType inputType, Map<String,
RowType> outputTypes) {
@@ -96,7 +96,9 @@ public class GlutenVectorOneInputOperator extends
TableStreamOperator<StatefulRe
LOG.debug("OutTypes: {}", outputTypes.keySet());
query =
new Query(
- mockInput, VeloxQueryConfig.getConfig(getRuntimeContext()),
ConnectorConfig.empty());
+ mockInput,
+ VeloxQueryConfig.getConfig(getRuntimeContext()),
+ VeloxConnectorConfig.getConfig(getRuntimeContext()));
task = session.queryOps().execute(query);
}
diff --git
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java
index 472bd0bfed..67bc802946 100644
---
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java
+++
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java
@@ -16,11 +16,11 @@
*/
package org.apache.gluten.table.runtime.operators;
+import org.apache.gluten.table.runtime.config.VeloxConnectorConfig;
import org.apache.gluten.table.runtime.config.VeloxQueryConfig;
import org.apache.gluten.table.runtime.metrics.SourceTaskMetrics;
import io.github.zhztheplayer.velox4j.Velox4j;
-import io.github.zhztheplayer.velox4j.config.ConnectorConfig;
import io.github.zhztheplayer.velox4j.connector.ConnectorSplit;
import io.github.zhztheplayer.velox4j.iterator.UpIterator;
import io.github.zhztheplayer.velox4j.memory.AllocationListener;
@@ -33,7 +33,6 @@ import io.github.zhztheplayer.velox4j.session.Session;
import io.github.zhztheplayer.velox4j.stateful.StatefulElement;
import io.github.zhztheplayer.velox4j.type.RowType;
-import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
@@ -53,7 +52,7 @@ import java.util.Map;
* instead of RowData to avoid data convert.
*/
public class GlutenVectorSourceFunction extends
RichParallelSourceFunction<StatefulElement>
- implements CheckpointedFunction, CheckpointListener {
+ implements CheckpointedFunction {
private static final Logger LOG =
LoggerFactory.getLogger(GlutenVectorSourceFunction.class);
private final StatefulPlanNode planNode;
@@ -103,7 +102,9 @@ public class GlutenVectorSourceFunction extends
RichParallelSourceFunction<State
session = Velox4j.newSession(memoryManager);
query =
new Query(
- planNode, VeloxQueryConfig.getConfig(getRuntimeContext()),
ConnectorConfig.empty());
+ planNode,
+ VeloxQueryConfig.getConfig(getRuntimeContext()),
+ VeloxConnectorConfig.getConfig(getRuntimeContext()));
allocator = new RootAllocator(Long.MAX_VALUE);
task = session.queryOps().execute(query);
@@ -159,7 +160,9 @@ public class GlutenVectorSourceFunction extends
RichParallelSourceFunction<State
session = Velox4j.newSession(memoryManager);
query =
new Query(
- planNode, VeloxQueryConfig.getConfig(getRuntimeContext()),
ConnectorConfig.empty());
+ planNode,
+ VeloxQueryConfig.getConfig(getRuntimeContext()),
+ VeloxConnectorConfig.getConfig(getRuntimeContext()));
allocator = new RootAllocator(Long.MAX_VALUE);
task = session.queryOps().execute(query);
@@ -170,13 +173,11 @@ public class GlutenVectorSourceFunction extends
RichParallelSourceFunction<State
this.task.initializeState(0);
}
- @Override
- public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ public String[] notifyCheckpointComplete(long checkpointId) throws Exception
{
// TODO: notify velox
- this.task.notifyCheckpointComplete(checkpointId);
+ return this.task.notifyCheckpointComplete(checkpointId);
}
- @Override
public void notifyCheckpointAborted(long checkpointId) throws Exception {
// TODO: notify velox
this.task.notifyCheckpointAborted(checkpointId);
diff --git
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorTwoInputOperator.java
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorTwoInputOperator.java
index a5c4da5f0c..4fbfe8c7ad 100644
---
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorTwoInputOperator.java
+++
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorTwoInputOperator.java
@@ -17,10 +17,10 @@
package org.apache.gluten.table.runtime.operators;
import org.apache.gluten.streaming.api.operators.GlutenOperator;
+import org.apache.gluten.table.runtime.config.VeloxConnectorConfig;
import org.apache.gluten.table.runtime.config.VeloxQueryConfig;
import io.github.zhztheplayer.velox4j.Velox4j;
-import io.github.zhztheplayer.velox4j.config.ConnectorConfig;
import io.github.zhztheplayer.velox4j.connector.ExternalStreamConnectorSplit;
import io.github.zhztheplayer.velox4j.connector.ExternalStreams;
import io.github.zhztheplayer.velox4j.data.RowVector;
@@ -96,7 +96,9 @@ public class GlutenVectorTwoInputOperator extends
AbstractStreamOperator<Statefu
session = Velox4j.newSession(memoryManager);
query =
new Query(
- glutenPlan, VeloxQueryConfig.getConfig(getRuntimeContext()),
ConnectorConfig.empty());
+ glutenPlan,
+ VeloxQueryConfig.getConfig(getRuntimeContext()),
+ VeloxConnectorConfig.getConfig(getRuntimeContext()));
task = session.queryOps().execute(query);
LOG.debug("Gluten Plan: {}", Serde.toJson(glutenPlan));
LOG.debug("OutTypes: {}", outputTypes.keySet());
diff --git
a/gluten-flink/runtime/src/main/java/org/apache/gluten/util/ReflectUtils.java
b/gluten-flink/runtime/src/main/java/org/apache/gluten/util/ReflectUtils.java
index 74c0208066..7dff789ae2 100644
---
a/gluten-flink/runtime/src/main/java/org/apache/gluten/util/ReflectUtils.java
+++
b/gluten-flink/runtime/src/main/java/org/apache/gluten/util/ReflectUtils.java
@@ -33,6 +33,15 @@ public class ReflectUtils {
}
}
+ public static Object getObjectField(String className, Object obj, String
fieldName) {
+ try {
+ Class<?> clazz = Class.forName(className);
+ return getObjectField(clazz, obj, fieldName);
+ } catch (ClassNotFoundException e) {
+ throw new FlinkRuntimeException(e);
+ }
+ }
+
public static Object invokeObjectMethod(
Class<?> clazz, Object obj, String methodName, Class<?>[] paramTypes,
Object[] paramValues) {
try {
diff --git
a/gluten-flink/runtime/src/main/java/org/apache/gluten/util/Utils.java
b/gluten-flink/runtime/src/main/java/org/apache/gluten/util/Utils.java
index 1f8d46b6e9..d580a520af 100644
--- a/gluten-flink/runtime/src/main/java/org/apache/gluten/util/Utils.java
+++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/util/Utils.java
@@ -16,12 +16,14 @@
*/
package org.apache.gluten.util;
+import org.apache.flink.connector.file.table.stream.PartitionCommitInfo;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.runtime.tasks.StreamTaskException;
import org.apache.flink.util.InstantiationUtil;
import java.io.IOException;
+import java.io.Serializable;
import java.util.Map;
/** Utils to add and get some infos to StreamConfig. */
@@ -66,4 +68,11 @@ public class Utils {
String.format("Could not serialize object for key %s.", key), e);
}
}
+
+ public static Serializable constructCommitInfo(
+ long id, int subtaskIndex, int numberOfSubtasks, String[] committed) {
+ PartitionCommitInfo commitInfo =
+ new PartitionCommitInfo(id, subtaskIndex, numberOfSubtasks, committed);
+ return commitInfo;
+ }
}
diff --git a/gluten-flink/ut/pom.xml b/gluten-flink/ut/pom.xml
index 74311055a6..252fbf94a4 100644
--- a/gluten-flink/ut/pom.xml
+++ b/gluten-flink/ut/pom.xml
@@ -176,12 +176,24 @@
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-files</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-csv</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/gluten-flink/ut/src/test/resources/nexmark/q10.sql
b/gluten-flink/ut/src/test/resources/nexmark/q10.sql
new file mode 100644
index 0000000000..6d13d7c85f
--- /dev/null
+++ b/gluten-flink/ut/src/test/resources/nexmark/q10.sql
@@ -0,0 +1,23 @@
+CREATE TABLE nexmark_q10 (
+ auction BIGINT,
+ bidder BIGINT,
+ price BIGINT,
+ `dateTime` TIMESTAMP(3),
+ extra VARCHAR,
+ dt STRING,
+ hm STRING
+) PARTITIONED BY (dt, hm) WITH (
+ 'connector' = 'filesystem',
+ 'path' = 'file:///tmp/data/output/bid/',
+ 'format' = 'csv',
+ 'sink.partition-commit.trigger' = 'partition-time',
+ 'sink.partition-commit.delay' = '1 min',
+ 'sink.partition-commit.policy.kind' = 'success-file',
+ 'partition.time-extractor.timestamp-pattern' = '$dt $hm:00',
+ 'sink.rolling-policy.rollover-interval' = '1min',
+ 'sink.rolling-policy.check-interval' = '1min'
+);
+
+INSERT INTO nexmark_q10
+SELECT auction, bidder, price, `dateTime`, extra, DATE_FORMAT(`dateTime`,
'yyyy-MM-dd'), DATE_FORMAT(`dateTime`, 'HH:mm')
+FROM bid;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]