This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new f4f306657 [INLONG-5587][Sort] Add filesystem connector module for
inlong metric and inlong audit (#5591)
f4f306657 is described below
commit f4f30665701d7833183bf7a66d1123e705f7da7e
Author: Xin Gong <[email protected]>
AuthorDate: Fri Aug 19 17:46:25 2022 +0800
[INLONG-5587][Sort] Add filesystem connector module for inlong metric and
inlong audit (#5591)
---
.../src/main/assemblies/sort-connectors.xml | 8 +
.../protocol/node/load/FileSystemLoadNode.java | 5 +-
.../org/apache/inlong/sort/base/Constants.java | 6 +
inlong-sort/sort-connectors/filesystem/pom.xml | 84 +++
.../sort/filesystem/AbstractFileSystemTable.java | 73 +++
.../sort/filesystem/FileSystemTableFactory.java | 217 +++++++
.../sort/filesystem/FileSystemTableSink.java | 685 +++++++++++++++++++++
.../filesystem/stream/AbstractStreamingWriter.java | 206 +++++++
.../filesystem/stream/StreamingFileWriter.java | 101 +++
.../sort/filesystem/stream/StreamingSink.java | 163 +++++
.../stream/compact/CompactFileWriter.java | 68 ++
.../org.apache.flink.table.factories.Factory | 19 +
inlong-sort/sort-connectors/pom.xml | 1 +
inlong-sort/sort-core/pom.xml | 6 +
.../sort/parser/FilesystemSqlParserTest.java | 129 ++++
licenses/inlong-sort-connectors/LICENSE | 10 +
licenses/inlong-sort-connectors/NOTICE | 15 +
17 files changed, 1794 insertions(+), 2 deletions(-)
diff --git a/inlong-distribution/src/main/assemblies/sort-connectors.xml
b/inlong-distribution/src/main/assemblies/sort-connectors.xml
index 674ea90a5..22bfd7526 100644
--- a/inlong-distribution/src/main/assemblies/sort-connectors.xml
+++ b/inlong-distribution/src/main/assemblies/sort-connectors.xml
@@ -147,6 +147,14 @@
</includes>
<fileMode>0644</fileMode>
</fileSet>
+ <fileSet>
+
<directory>../inlong-sort/sort-connectors/filesystem/target</directory>
+ <outputDirectory>inlong-sort/connectors</outputDirectory>
+ <includes>
+
<include>sort-connector-filesystem-${project.version}.jar</include>
+ </includes>
+ <fileMode>0644</fileMode>
+ </fileSet>
<!-- module's 3td-licenses, notices-->
<fileSet>
<directory>../licenses/inlong-sort-connectors</directory>
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/FileSystemLoadNode.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/FileSystemLoadNode.java
index 31224ab1f..43e309254 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/FileSystemLoadNode.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/FileSystemLoadNode.java
@@ -25,6 +25,7 @@ import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCre
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.InlongMetric;
import org.apache.inlong.sort.protocol.node.LoadNode;
import org.apache.inlong.sort.protocol.transformation.FieldRelation;
import org.apache.inlong.sort.protocol.transformation.FilterFunction;
@@ -39,7 +40,7 @@ import java.util.Map;
@JsonTypeName("fileSystemLoad")
@Data
@NoArgsConstructor
-public class FileSystemLoadNode extends LoadNode implements Serializable {
+public class FileSystemLoadNode extends LoadNode implements InlongMetric,
Serializable {
private static final long serialVersionUID = -4836034838166667371L;
@@ -90,7 +91,7 @@ public class FileSystemLoadNode extends LoadNode implements
Serializable {
@Override
public Map<String, String> tableOptions() {
Map<String, String> map = super.tableOptions();
- map.put("connector", "filesystem");
+ map.put("connector", "filesystem-inlong");
map.put("path", path);
map.put("format", format);
if (null != partitionFields) {
diff --git
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
index 19b1a90f4..9dd124284 100644
---
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
+++
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
@@ -88,4 +88,10 @@ public final class Constants {
.noDefaultValue()
.withDescription("INLONG AUDIT HOST + '&' + PORT");
+ public static final ConfigOption<Boolean> IGNORE_ALL_CHANGELOG =
+ ConfigOptions.key("sink.ignore.changelog")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Regard upsert delete as insert kind.");
+
}
diff --git a/inlong-sort/sort-connectors/filesystem/pom.xml
b/inlong-sort/sort-connectors/filesystem/pom.xml
new file mode 100644
index 000000000..f14c30087
--- /dev/null
+++ b/inlong-sort/sort-connectors/filesystem/pom.xml
@@ -0,0 +1,84 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>sort-connectors</artifactId>
+ <groupId>org.apache.inlong</groupId>
+ <version>1.3.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>sort-connector-filesystem</artifactId>
+ <name>Apache InLong - Sort-connector-filesystem</name>
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-connector-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>shade-flink</id>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <artifactSet>
+ <includes>
+ <include>org.apache.inlong:*</include>
+ </includes>
+ </artifactSet>
+ <filters>
+ <filter>
+
<artifact>org.apache.inlong:sort-connector-*</artifact>
+ <includes>
+ <include>org/apache/inlong/**</include>
+
<include>META-INF/services/org.apache.flink.table.factories.Factory</include>
+ </includes>
+ </filter>
+ </filters>
+ <relocations>
+ <relocation>
+
<pattern>org.apache.inlong.sort.base</pattern>
+ <shadedPattern>
+
org.apache.inlong.sort.filesystem.shaded.org.apache.inlong.sort.base
+ </shadedPattern>
+ </relocation>
+ </relocations>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
\ No newline at end of file
diff --git
a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/AbstractFileSystemTable.java
b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/AbstractFileSystemTable.java
new file mode 100644
index 000000000..a9baf0a80
--- /dev/null
+++
b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/AbstractFileSystemTable.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.filesystem;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DelegatingConfiguration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.utils.TableSchemaUtils;
+
+import java.util.List;
+
+import static
org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_DEFAULT_NAME;
+import static org.apache.flink.table.filesystem.FileSystemOptions.PATH;
+
+/** Abstract File system table for providing some common methods. */
+abstract class AbstractFileSystemTable {
+
+ final DynamicTableFactory.Context context;
+ final ObjectIdentifier tableIdentifier;
+ final Configuration tableOptions;
+ final TableSchema schema;
+ final List<String> partitionKeys;
+ final Path path;
+ final String defaultPartName;
+
+ AbstractFileSystemTable(DynamicTableFactory.Context context) {
+ this.context = context;
+ this.tableIdentifier = context.getObjectIdentifier();
+ this.tableOptions = new Configuration();
+
context.getCatalogTable().getOptions().forEach(tableOptions::setString);
+ this.schema =
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
+ this.partitionKeys = context.getCatalogTable().getPartitionKeys();
+ this.path = new Path(tableOptions.get(PATH));
+ this.defaultPartName = tableOptions.get(PARTITION_DEFAULT_NAME);
+ }
+
+ ReadableConfig formatOptions(String identifier) {
+ return new DelegatingConfiguration(tableOptions, identifier + ".");
+ }
+
+ DataType getFormatDataType() {
+ TableSchema.Builder builder = TableSchema.builder();
+ schema.getTableColumns()
+ .forEach(
+ column -> {
+ if (!partitionKeys.contains(column.getName())) {
+ builder.add(column);
+ }
+ });
+ return builder.build().toRowDataType();
+ }
+}
diff --git
a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/FileSystemTableFactory.java
b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/FileSystemTableFactory.java
new file mode 100644
index 000000000..58c585ebc
--- /dev/null
+++
b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/FileSystemTableFactory.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.filesystem;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.BulkReaderFormatFactory;
+import org.apache.flink.table.factories.BulkWriterFormatFactory;
+import org.apache.flink.table.factories.DecodingFormatFactory;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.EncodingFormatFactory;
+import org.apache.flink.table.factories.Factory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.FileSystemFormatFactory;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+import org.apache.flink.table.factories.TableFactory;
+import org.apache.flink.table.filesystem.FileSystemOptions;
+import org.apache.flink.table.filesystem.FileSystemTableSource;
+
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static java.time.ZoneId.SHORT_IDS;
+import static org.apache.inlong.sort.base.Constants.IGNORE_ALL_CHANGELOG;
+import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
+
+/**
+ * File system {@link TableFactory}.
+ *
+ * <p>1.The partition information should be in the file system path, whether
it's a temporary table
+ * or a catalog table. 2.Support insert into (append) and insert overwrite.
3.Support static and
+ * dynamic partition inserting.
+ *
+ * copy from flink-table-runtime-blink:1.13.2-rc2
+ * Add inlong metric option and inlong audit option for computing metric
+ */
+public class FileSystemTableFactory implements DynamicTableSourceFactory,
DynamicTableSinkFactory {
+
+ public static final String IDENTIFIER = "filesystem-inlong";
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public DynamicTableSource createDynamicTableSource(Context context) {
+ FactoryUtil.TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, context);
+ validate(helper);
+ return new FileSystemTableSource(
+ context,
+ discoverDecodingFormat(context, BulkReaderFormatFactory.class),
+ discoverDecodingFormat(context,
DeserializationFormatFactory.class),
+ discoverFormatFactory(context));
+ }
+
+ @Override
+ public DynamicTableSink createDynamicTableSink(Context context) {
+ FactoryUtil.TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, context);
+ validate(helper);
+ return new FileSystemTableSink(
+ context,
+ discoverDecodingFormat(context, BulkReaderFormatFactory.class),
+ discoverDecodingFormat(context,
DeserializationFormatFactory.class),
+ discoverFormatFactory(context),
+ discoverEncodingFormat(context, BulkWriterFormatFactory.class),
+ discoverEncodingFormat(context,
SerializationFormatFactory.class));
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(FileSystemOptions.PATH);
+ options.add(FactoryUtil.FORMAT);
+ return options;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(FileSystemOptions.PARTITION_DEFAULT_NAME);
+ options.add(FileSystemOptions.SINK_ROLLING_POLICY_FILE_SIZE);
+ options.add(FileSystemOptions.SINK_ROLLING_POLICY_ROLLOVER_INTERVAL);
+ options.add(FileSystemOptions.SINK_ROLLING_POLICY_CHECK_INTERVAL);
+ options.add(FileSystemOptions.SINK_SHUFFLE_BY_PARTITION);
+ options.add(FileSystemOptions.PARTITION_TIME_EXTRACTOR_KIND);
+ options.add(FileSystemOptions.PARTITION_TIME_EXTRACTOR_CLASS);
+
options.add(FileSystemOptions.PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN);
+ options.add(FileSystemOptions.SINK_PARTITION_COMMIT_TRIGGER);
+ options.add(FileSystemOptions.SINK_PARTITION_COMMIT_DELAY);
+
options.add(FileSystemOptions.SINK_PARTITION_COMMIT_WATERMARK_TIME_ZONE);
+ options.add(FileSystemOptions.SINK_PARTITION_COMMIT_POLICY_KIND);
+ options.add(FileSystemOptions.SINK_PARTITION_COMMIT_POLICY_CLASS);
+ options.add(FileSystemOptions.SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME);
+ options.add(FileSystemOptions.AUTO_COMPACTION);
+ options.add(FileSystemOptions.COMPACTION_FILE_SIZE);
+ options.add(FileSystemOptions.SINK_PARALLELISM);
+ options.add(INLONG_METRIC);
+ options.add(INLONG_AUDIT);
+ options.add(IGNORE_ALL_CHANGELOG);
+ return options;
+ }
+
+ private void validate(FactoryUtil.TableFactoryHelper helper) {
+ // Except format options, some formats like parquet and orc can not
list all supported
+ // options.
+ helper.validateExcept(helper.getOptions().get(FactoryUtil.FORMAT) +
".");
+
+ // validate time zone of watermark
+ String watermarkTimeZone =
+ helper.getOptions()
+
.get(FileSystemOptions.SINK_PARTITION_COMMIT_WATERMARK_TIME_ZONE);
+ if (watermarkTimeZone.startsWith("UTC+")
+ || watermarkTimeZone.startsWith("UTC-")
+ || SHORT_IDS.containsKey(watermarkTimeZone)) {
+ throw new ValidationException(
+ String.format(
+ "The supported watermark time zone is either a
full name such as 'America/Los_Angeles',"
+ + " or a custom time zone id such as
'GMT-08:00', but configured time zone is "
+ + "'%s'.",
+ watermarkTimeZone));
+ }
+ }
+
+ private <I, F extends DecodingFormatFactory<I>> DecodingFormat<I>
discoverDecodingFormat(
+ Context context, Class<F> formatFactoryClass) {
+ FactoryUtil.TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, context);
+ if (formatFactoryExists(context, formatFactoryClass)) {
+ return helper.discoverDecodingFormat(formatFactoryClass,
FactoryUtil.FORMAT);
+ } else {
+ return null;
+ }
+ }
+
+ private <I, F extends EncodingFormatFactory<I>> EncodingFormat<I>
discoverEncodingFormat(
+ Context context, Class<F> formatFactoryClass) {
+ FactoryUtil.TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, context);
+ if (formatFactoryExists(context, formatFactoryClass)) {
+ return helper.discoverEncodingFormat(formatFactoryClass,
FactoryUtil.FORMAT);
+ } else {
+ return null;
+ }
+ }
+
+ private FileSystemFormatFactory discoverFormatFactory(Context context) {
+ if (formatFactoryExists(context, FileSystemFormatFactory.class)) {
+ Configuration options =
Configuration.fromMap(context.getCatalogTable().getOptions());
+ String identifier = options.get(FactoryUtil.FORMAT);
+ return FactoryUtil.discoverFactory(
+ Thread.currentThread().getContextClassLoader(),
+ FileSystemFormatFactory.class,
+ identifier);
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * Returns true if the format factory can be found using the given factory
base class and
+ * identifier.
+ */
+ private boolean formatFactoryExists(Context context, Class<?>
factoryClass) {
+ Configuration options =
Configuration.fromMap(context.getCatalogTable().getOptions());
+ String identifier = options.get(FactoryUtil.FORMAT);
+ if (identifier == null) {
+ throw new ValidationException(
+ String.format(
+ "Table options do not contain an option key '%s'
for discovering a format.",
+ FactoryUtil.FORMAT.key()));
+ }
+
+ final List<Factory> factories = new LinkedList<>();
+ ServiceLoader.load(Factory.class, context.getClassLoader())
+ .iterator()
+ .forEachRemaining(factories::add);
+
+ final List<Factory> foundFactories =
+ factories.stream()
+ .filter(f ->
factoryClass.isAssignableFrom(f.getClass()))
+ .collect(Collectors.toList());
+
+ final List<Factory> matchingFactories =
+ foundFactories.stream()
+ .filter(f -> f.factoryIdentifier().equals(identifier))
+ .collect(Collectors.toList());
+
+ return !matchingFactories.isEmpty();
+ }
+}
diff --git
a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/FileSystemTableSink.java
b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/FileSystemTableSink.java
new file mode 100644
index 000000000..60bcf9332
--- /dev/null
+++
b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/FileSystemTableSink.java
@@ -0,0 +1,685 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.filesystem;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
+import
org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
+import org.apache.flink.streaming.api.functions.sink.filesystem.PartFileInfo;
+import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
+import
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
+import
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.BucketsBuilder;
+import
org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
+import
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.CheckpointRollingPolicy;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
+import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.FileSystemFormatFactory;
+import org.apache.flink.table.filesystem.DeserializationSchemaAdapter;
+import org.apache.flink.table.filesystem.EmptyMetaStoreFactory;
+import org.apache.flink.table.filesystem.FileSystemFactory;
+import org.apache.flink.table.filesystem.FileSystemOptions;
+import org.apache.flink.table.filesystem.FileSystemOutputFormat;
+import org.apache.flink.table.filesystem.OutputFormatFactory;
+import org.apache.flink.table.filesystem.PartitionComputer;
+import org.apache.flink.table.filesystem.RowDataPartitionComputer;
+import org.apache.flink.table.filesystem.SerializationSchemaAdapter;
+import org.apache.flink.table.filesystem.stream.PartitionCommitInfo;
+import org.apache.flink.table.filesystem.stream.compact.CompactBulkReader;
+import org.apache.flink.table.filesystem.stream.compact.CompactReader;
+import
org.apache.flink.table.filesystem.stream.compact.FileInputFormatCompactReader;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.utils.PartitionPathUtils;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Preconditions;
+import org.apache.inlong.sort.base.util.ValidateMetricOptionUtils;
+import org.apache.inlong.sort.filesystem.stream.StreamingSink;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import static
org.apache.flink.table.filesystem.FileSystemOptions.SINK_ROLLING_POLICY_CHECK_INTERVAL;
+import static
org.apache.flink.table.filesystem.FileSystemOptions.SINK_ROLLING_POLICY_FILE_SIZE;
+import static
org.apache.flink.table.filesystem.FileSystemOptions.SINK_ROLLING_POLICY_ROLLOVER_INTERVAL;
+import static
org.apache.flink.table.filesystem.stream.compact.CompactOperator.convertToUncompacted;
+import static org.apache.inlong.sort.base.Constants.IGNORE_ALL_CHANGELOG;
+import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
+
+/**
+ * File system {@link DynamicTableSink}.
+ */
+public class FileSystemTableSink extends AbstractFileSystemTable
+ implements DynamicTableSink, SupportsPartitioning, SupportsOverwrite {
+
+ // For compaction reading
+ @Nullable
+ private final DecodingFormat<BulkFormat<RowData, FileSourceSplit>>
bulkReaderFormat;
+ @Nullable
+ private final DecodingFormat<DeserializationSchema<RowData>>
deserializationFormat;
+ @Nullable
+ private final FileSystemFormatFactory formatFactory;
+
+ // For Writing
+ @Nullable
+ private final EncodingFormat<BulkWriter.Factory<RowData>> bulkWriterFormat;
+ @Nullable
+ private final EncodingFormat<SerializationSchema<RowData>>
serializationFormat;
+
+ private boolean overwrite = false;
+ private boolean dynamicGrouping = false;
+ private LinkedHashMap<String, String> staticPartitions = new
LinkedHashMap<>();
+
+ @Nullable
+ private Integer configuredParallelism;
+
+ private String inlongMetric;
+ private String inlongAudit;
+
+ FileSystemTableSink(
+ DynamicTableFactory.Context context,
+ @Nullable DecodingFormat<BulkFormat<RowData, FileSourceSplit>>
bulkReaderFormat,
+ @Nullable DecodingFormat<DeserializationSchema<RowData>>
deserializationFormat,
+ @Nullable FileSystemFormatFactory formatFactory,
+ @Nullable EncodingFormat<BulkWriter.Factory<RowData>>
bulkWriterFormat,
+ @Nullable EncodingFormat<SerializationSchema<RowData>>
serializationFormat) {
+ super(context);
+ this.bulkReaderFormat = bulkReaderFormat;
+ this.deserializationFormat = deserializationFormat;
+ this.formatFactory = formatFactory;
+ if (Stream.of(bulkWriterFormat, serializationFormat, formatFactory)
+ .allMatch(Objects::isNull)) {
+ Configuration options =
Configuration.fromMap(context.getCatalogTable().getOptions());
+ String identifier = options.get(FactoryUtil.FORMAT);
+ throw new ValidationException(
+ String.format(
+ "Could not find any format factory for identifier
'%s' in the classpath.",
+ identifier));
+ }
+ this.bulkWriterFormat = bulkWriterFormat;
+ this.serializationFormat = serializationFormat;
+ this.configuredParallelism =
tableOptions.get(FileSystemOptions.SINK_PARALLELISM);
+ this.inlongMetric = tableOptions.get(INLONG_METRIC);
+ this.inlongAudit = tableOptions.get(INLONG_AUDIT);
+
ValidateMetricOptionUtils.validateInlongMetricIfSetInlongAudit(inlongMetric,
inlongAudit);
+ }
+
+ @Override
+ public SinkRuntimeProvider getSinkRuntimeProvider(Context sinkContext) {
+ return (DataStreamSinkProvider) dataStream -> consume(dataStream,
sinkContext);
+ }
+
+ private DataStreamSink<?> consume(DataStream<RowData> dataStream, Context
sinkContext) {
+ final int inputParallelism = dataStream.getParallelism();
+ final int parallelism =
Optional.ofNullable(configuredParallelism).orElse(inputParallelism);
+
+ if (sinkContext.isBounded()) {
+ return createBatchSink(dataStream, sinkContext, parallelism);
+ } else {
+ if (overwrite) {
+ throw new IllegalStateException("Streaming mode not support
overwrite.");
+ }
+
+ return createStreamingSink(dataStream, sinkContext, parallelism);
+ }
+ }
+
+ private RowDataPartitionComputer partitionComputer() {
+ return new RowDataPartitionComputer(
+ defaultPartName,
+ schema.getFieldNames(),
+ schema.getFieldDataTypes(),
+ partitionKeys.toArray(new String[0]));
+ }
+
+ private DataStreamSink<RowData> createBatchSink(
+ DataStream<RowData> inputStream, Context sinkContext, final int
parallelism) {
+ FileSystemOutputFormat.Builder<RowData> builder = new
FileSystemOutputFormat.Builder<>();
+ builder.setPartitionComputer(partitionComputer());
+ builder.setDynamicGrouped(dynamicGrouping);
+ builder.setPartitionColumns(partitionKeys.toArray(new String[0]));
+ builder.setFormatFactory(createOutputFormatFactory(sinkContext));
+ builder.setMetaStoreFactory(new EmptyMetaStoreFactory(path));
+ builder.setOverwrite(overwrite);
+ builder.setStaticPartitions(staticPartitions);
+ builder.setTempPath(toStagingPath());
+ builder.setOutputFileConfig(
+ OutputFileConfig.builder()
+ .withPartPrefix("part-" + UUID.randomUUID().toString())
+ .build());
+ return inputStream
+ .writeUsingOutputFormat(builder.build())
+ .setParallelism(parallelism)
+ .name("Filesystem");
+ }
+
+ private DataStreamSink<?> createStreamingSink(
+ DataStream<RowData> dataStream, Context sinkContext, final int
parallelism) {
+ FileSystemFactory fsFactory = FileSystem::get;
+ RowDataPartitionComputer computer = partitionComputer();
+
+ boolean autoCompaction =
tableOptions.getBoolean(FileSystemOptions.AUTO_COMPACTION);
+ Object writer = createWriter(sinkContext);
+ boolean isEncoder = writer instanceof Encoder;
+ TableBucketAssigner assigner = new TableBucketAssigner(computer);
+ TableRollingPolicy rollingPolicy =
+ new TableRollingPolicy(
+ !isEncoder || autoCompaction,
+
tableOptions.get(SINK_ROLLING_POLICY_FILE_SIZE).getBytes(),
+
tableOptions.get(SINK_ROLLING_POLICY_ROLLOVER_INTERVAL).toMillis());
+
+ String randomPrefix = "part-" + UUID.randomUUID().toString();
+ OutputFileConfig.OutputFileConfigBuilder fileNamingBuilder =
OutputFileConfig.builder();
+ fileNamingBuilder =
+ autoCompaction
+ ?
fileNamingBuilder.withPartPrefix(convertToUncompacted(randomPrefix))
+ : fileNamingBuilder.withPartPrefix(randomPrefix);
+ OutputFileConfig fileNamingConfig = fileNamingBuilder.build();
+
+ BucketsBuilder<RowData, String, ? extends BucketsBuilder<RowData, ?,
?>> bucketsBuilder;
+ if (isEncoder) {
+ //noinspection unchecked
+ bucketsBuilder =
+ StreamingFileSink.forRowFormat(
+ path,
+ new ProjectionEncoder((Encoder<RowData>)
writer, computer))
+ .withBucketAssigner(assigner)
+ .withOutputFileConfig(fileNamingConfig)
+ .withRollingPolicy(rollingPolicy);
+ } else {
+ //noinspection unchecked
+ bucketsBuilder =
+ StreamingFileSink.forBulkFormat(
+ path,
+ new ProjectionBulkFactory(
+ (BulkWriter.Factory<RowData>)
writer, computer))
+ .withBucketAssigner(assigner)
+ .withOutputFileConfig(fileNamingConfig)
+ .withRollingPolicy(rollingPolicy);
+ }
+
+ long bucketCheckInterval =
tableOptions.get(SINK_ROLLING_POLICY_CHECK_INTERVAL).toMillis();
+
+ DataStream<PartitionCommitInfo> writerStream;
+ if (autoCompaction) {
+ long compactionSize =
+ tableOptions
+
.getOptional(FileSystemOptions.COMPACTION_FILE_SIZE)
+
.orElse(tableOptions.get(SINK_ROLLING_POLICY_FILE_SIZE))
+ .getBytes();
+
+ CompactReader.Factory<RowData> reader =
+ createCompactReaderFactory(sinkContext)
+ .orElseThrow(
+ () ->
+ new TableException(
+ "Please implement
available reader for compaction:"
+ + " BulkFormat,
FileInputFormat."));
+
+ writerStream =
+ StreamingSink.compactionWriter(
+ dataStream,
+ bucketCheckInterval,
+ bucketsBuilder,
+ fsFactory,
+ path,
+ reader,
+ compactionSize,
+ parallelism,
+ inlongMetric,
+ inlongAudit);
+ } else {
+ writerStream =
+ StreamingSink.writer(
+ dataStream, bucketCheckInterval, bucketsBuilder,
parallelism, inlongMetric, inlongAudit);
+ }
+
+ return StreamingSink.sink(
+ writerStream,
+ path,
+ tableIdentifier,
+ partitionKeys,
+ new EmptyMetaStoreFactory(path),
+ fsFactory,
+ tableOptions);
+ }
+
+ private Optional<CompactReader.Factory<RowData>>
createCompactReaderFactory(Context context) {
+ DataType producedDataType = schema.toRowDataType();
+ if (bulkReaderFormat != null) {
+ BulkFormat<RowData, FileSourceSplit> format =
+ bulkReaderFormat.createRuntimeDecoder(
+ createSourceContext(context), producedDataType);
+ return Optional.of(CompactBulkReader.factory(format));
+ } else if (formatFactory != null) {
+ InputFormat<RowData, ?> format =
formatFactory.createReader(createReaderContext());
+ if (format instanceof FileInputFormat) {
+ //noinspection unchecked
+ return Optional.of(
+
FileInputFormatCompactReader.factory((FileInputFormat<RowData>) format));
+ }
+ } else if (deserializationFormat != null) {
+ // NOTE, we need pass full format types to deserializationFormat
+ DeserializationSchema<RowData> decoder =
+ deserializationFormat.createRuntimeDecoder(
+ createSourceContext(context), getFormatDataType());
+ int[] projectedFields = IntStream.range(0,
schema.getFieldCount()).toArray();
+ DeserializationSchemaAdapter format =
+ new DeserializationSchemaAdapter(
+ decoder, schema, projectedFields, partitionKeys,
defaultPartName);
+ return Optional.of(CompactBulkReader.factory(format));
+ }
+ return Optional.empty();
+ }
+
+ private DynamicTableSource.Context createSourceContext(Context context) {
+ return new DynamicTableSource.Context() {
+ @Override
+ public <T> TypeInformation<T> createTypeInformation(DataType
producedDataType) {
+ return context.createTypeInformation(producedDataType);
+ }
+
+ @Override
+ public DynamicTableSource.DataStructureConverter
createDataStructureConverter(
+ DataType producedDataType) {
+ throw new TableException("Compaction reader not support
DataStructure converter.");
+ }
+ };
+ }
+
+ private FileSystemFormatFactory.ReaderContext createReaderContext() {
+ return new FileSystemFormatFactory.ReaderContext() {
+ @Override
+ public TableSchema getSchema() {
+ return schema;
+ }
+
+ @Override
+ public ReadableConfig getFormatOptions() {
+ return formatOptions(formatFactory.factoryIdentifier());
+ }
+
+ @Override
+ public List<String> getPartitionKeys() {
+ return partitionKeys;
+ }
+
+ @Override
+ public String getDefaultPartName() {
+ return defaultPartName;
+ }
+
+ @Override
+ public Path[] getPaths() {
+ return new Path[]{path};
+ }
+
+ @Override
+ public int[] getProjectFields() {
+ return IntStream.range(0, schema.getFieldCount()).toArray();
+ }
+
+ @Override
+ public long getPushedDownLimit() {
+ return Long.MAX_VALUE;
+ }
+
+ @Override
+ public List<ResolvedExpression> getPushedDownFilters() {
+ return Collections.emptyList();
+ }
+ };
+ }
+
+ private Path toStagingPath() {
+ Path stagingDir = new Path(path, ".staging_" +
System.currentTimeMillis());
+ try {
+ FileSystem fs = stagingDir.getFileSystem();
+ Preconditions.checkState(
+ fs.exists(stagingDir) || fs.mkdirs(stagingDir),
+ "Failed to create staging dir " + stagingDir);
+ return stagingDir;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private OutputFormatFactory<RowData> createOutputFormatFactory(Context
sinkContext) {
+ Object writer = createWriter(sinkContext);
+ return writer instanceof Encoder
+ ? path -> createEncoderOutputFormat((Encoder<RowData>) writer,
path)
+ : path ->
createBulkWriterOutputFormat((BulkWriter.Factory<RowData>) writer, path);
+ }
+
+ private Object createWriter(Context sinkContext) {
+ if (bulkWriterFormat != null) {
+ return bulkWriterFormat.createRuntimeEncoder(sinkContext,
getFormatDataType());
+ } else if (serializationFormat != null) {
+ return new SerializationSchemaAdapter(
+ serializationFormat.createRuntimeEncoder(sinkContext,
getFormatDataType()));
+ } else {
+ throw new TableException("Can not find format factory.");
+ }
+ }
+
+ private void checkConfiguredParallelismAllowed(ChangelogMode
requestChangelogMode) {
+ final Integer parallelism = this.configuredParallelism;
+ if (parallelism == null) {
+ return;
+ }
+ if (!requestChangelogMode.containsOnly(RowKind.INSERT)) {
+ throw new ValidationException(
+ String.format(
+ "Currently, filesystem sink doesn't support
setting parallelism (%d) by '%s' "
+ + "when the input stream is not INSERT
only. The row kinds of input stream are "
+ + "[%s]",
+ parallelism,
+ FileSystemOptions.SINK_PARALLELISM.key(),
+ requestChangelogMode.getContainedKinds().stream()
+ .map(RowKind::shortString)
+ .collect(Collectors.joining(","))));
+ }
+ }
+
+ private static OutputFormat<RowData> createBulkWriterOutputFormat(
+ BulkWriter.Factory<RowData> factory, Path path) {
+ return new OutputFormat<RowData>() {
+
+ private static final long serialVersionUID = 1L;
+
+ private transient BulkWriter<RowData> writer;
+
+ @Override
+ public void configure(Configuration parameters) {
+ }
+
+ @Override
+ public void open(int taskNumber, int numTasks) throws IOException {
+ this.writer =
+ factory.create(
+ path.getFileSystem().create(path,
FileSystem.WriteMode.OVERWRITE));
+ }
+
+ @Override
+ public void writeRecord(RowData record) throws IOException {
+ writer.addElement(record);
+ }
+
+ @Override
+ public void close() throws IOException {
+ writer.flush();
+ writer.finish();
+ }
+ };
+ }
+
+ private static OutputFormat<RowData> createEncoderOutputFormat(
+ Encoder<RowData> encoder, Path path) {
+ return new OutputFormat<RowData>() {
+
+ private static final long serialVersionUID = 1L;
+
+ private transient FSDataOutputStream output;
+
+ @Override
+ public void configure(Configuration parameters) {
+ }
+
+ @Override
+ public void open(int taskNumber, int numTasks) throws IOException {
+ this.output = path.getFileSystem().create(path,
FileSystem.WriteMode.OVERWRITE);
+ }
+
+ @Override
+ public void writeRecord(RowData record) throws IOException {
+ encoder.encode(record, output);
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.output.flush();
+ this.output.close();
+ }
+ };
+ }
+
+ private LinkedHashMap<String, String> toPartialLinkedPartSpec(Map<String,
String> part) {
+ LinkedHashMap<String, String> partSpec = new LinkedHashMap<>();
+ for (String partitionKey : partitionKeys) {
+ if (part.containsKey(partitionKey)) {
+ partSpec.put(partitionKey, part.get(partitionKey));
+ }
+ }
+ return partSpec;
+ }
+
+ @Override
+ public boolean requiresPartitionGrouping(boolean supportsGrouping) {
+ this.dynamicGrouping = supportsGrouping;
+ return dynamicGrouping;
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+ checkConfiguredParallelismAllowed(requestedMode);
+ boolean ignoreChangelog = tableOptions.get(IGNORE_ALL_CHANGELOG);
+ if (ignoreChangelog) {
+ return ChangelogMode.all();
+ }
+ if (bulkWriterFormat != null) {
+ return bulkWriterFormat.getChangelogMode();
+ } else if (serializationFormat != null) {
+ return serializationFormat.getChangelogMode();
+ } else {
+ throw new TableException("Can not find format factory.");
+ }
+ }
+
+ @Override
+ public DynamicTableSink copy() {
+ FileSystemTableSink sink =
+ new FileSystemTableSink(
+ context,
+ bulkReaderFormat,
+ deserializationFormat,
+ formatFactory,
+ bulkWriterFormat,
+ serializationFormat);
+ sink.overwrite = overwrite;
+ sink.dynamicGrouping = dynamicGrouping;
+ sink.staticPartitions = staticPartitions;
+ return sink;
+ }
+
+ @Override
+ public String asSummaryString() {
+ return "Filesystem";
+ }
+
+ @Override
+ public void applyOverwrite(boolean overwrite) {
+ this.overwrite = overwrite;
+ }
+
+ @Override
+ public void applyStaticPartition(Map<String, String> partition) {
+ this.staticPartitions = toPartialLinkedPartSpec(partition);
+ }
+
+ /**
+ * Table bucket assigner, wrap {@link PartitionComputer}.
+ */
+ public static class TableBucketAssigner implements BucketAssigner<RowData,
String> {
+
+ private final PartitionComputer<RowData> computer;
+
+ public TableBucketAssigner(PartitionComputer<RowData> computer) {
+ this.computer = computer;
+ }
+
+ @Override
+ public String getBucketId(RowData element, Context context) {
+ try {
+ return PartitionPathUtils.generatePartitionPath(
+ computer.generatePartValues(element));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public SimpleVersionedSerializer<String> getSerializer() {
+ return SimpleVersionedStringSerializer.INSTANCE;
+ }
+ }
+
+ /**
+ * Table {@link RollingPolicy}, it extends {@link CheckpointRollingPolicy}
for bulk writers.
+ */
+ public static class TableRollingPolicy extends
CheckpointRollingPolicy<RowData, String> {
+
+ private final boolean rollOnCheckpoint;
+ private final long rollingFileSize;
+ private final long rollingTimeInterval;
+
+ public TableRollingPolicy(
+ boolean rollOnCheckpoint, long rollingFileSize, long
rollingTimeInterval) {
+ this.rollOnCheckpoint = rollOnCheckpoint;
+ Preconditions.checkArgument(rollingFileSize > 0L);
+ Preconditions.checkArgument(rollingTimeInterval > 0L);
+ this.rollingFileSize = rollingFileSize;
+ this.rollingTimeInterval = rollingTimeInterval;
+ }
+
+ @Override
+ public boolean shouldRollOnCheckpoint(PartFileInfo<String>
partFileState) {
+ try {
+ return rollOnCheckpoint || partFileState.getSize() >
rollingFileSize;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public boolean shouldRollOnEvent(PartFileInfo<String> partFileState,
RowData element)
+ throws IOException {
+ return partFileState.getSize() > rollingFileSize;
+ }
+
+ @Override
+ public boolean shouldRollOnProcessingTime(
+ PartFileInfo<String> partFileState, long currentTime) {
+ return currentTime - partFileState.getCreationTime() >=
rollingTimeInterval;
+ }
+ }
+
+ private static class ProjectionEncoder implements Encoder<RowData> {
+
+ private final Encoder<RowData> encoder;
+ private final RowDataPartitionComputer computer;
+
+ private ProjectionEncoder(Encoder<RowData> encoder,
RowDataPartitionComputer computer) {
+ this.encoder = encoder;
+ this.computer = computer;
+ }
+
+ @Override
+ public void encode(RowData element, OutputStream stream) throws
IOException {
+ encoder.encode(computer.projectColumnsToWrite(element), stream);
+ }
+ }
+
+ /**
+ * Project row to non-partition fields.
+ */
+ public static class ProjectionBulkFactory implements
BulkWriter.Factory<RowData> {
+
+ private final BulkWriter.Factory<RowData> factory;
+ private final RowDataPartitionComputer computer;
+
+ public ProjectionBulkFactory(
+ BulkWriter.Factory<RowData> factory, RowDataPartitionComputer
computer) {
+ this.factory = factory;
+ this.computer = computer;
+ }
+
+ @Override
+ public BulkWriter<RowData> create(FSDataOutputStream out) throws
IOException {
+ BulkWriter<RowData> writer = factory.create(out);
+ return new BulkWriter<RowData>() {
+
+ @Override
+ public void addElement(RowData element) throws IOException {
+ writer.addElement(computer.projectColumnsToWrite(element));
+ }
+
+ @Override
+ public void flush() throws IOException {
+ writer.flush();
+ }
+
+ @Override
+ public void finish() throws IOException {
+ writer.finish();
+ }
+ };
+ }
+ }
+}
diff --git
a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java
b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java
new file mode 100644
index 000000000..47516a270
--- /dev/null
+++
b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.filesystem.stream;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.functions.sink.filesystem.Bucket;
+import
org.apache.flink.streaming.api.functions.sink.filesystem.BucketLifeCycleListener;
+import org.apache.flink.streaming.api.functions.sink.filesystem.Buckets;
+import
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
+import
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.apache.inlong.sort.base.metric.ThreadSafeCounter;
+
+import static org.apache.inlong.sort.base.Constants.DELIMITER;
+
+/**
+ * Operator for file system sink. It is a operator version of {@link
StreamingFileSink}. It can send
+ * file and bucket information to downstream.
+ */
+public abstract class AbstractStreamingWriter<IN, OUT> extends
AbstractStreamOperator<OUT>
+ implements OneInputStreamOperator<IN, OUT>, BoundedOneInput {
+
+ private static final long serialVersionUID = 1L;
+
+ // ------------------------ configuration fields --------------------------
+
+ private final long bucketCheckInterval;
+
+ private final StreamingFileSink.BucketsBuilder<
+ IN, String, ? extends StreamingFileSink.BucketsBuilder<IN, String,
?>>
+ bucketsBuilder;
+
+ private String inlongMetric;
+
+ private String inlongAudit;
+
+ // --------------------------- runtime fields -----------------------------
+
+ private transient Buckets<IN, String> buckets;
+
+ private transient StreamingFileSinkHelper<IN> helper;
+
+ private transient long currentWatermark;
+
+ private SinkMetricData metricData;
+
+ public AbstractStreamingWriter(
+ long bucketCheckInterval,
+ StreamingFileSink.BucketsBuilder<
+ IN, String, ? extends StreamingFileSink.BucketsBuilder<IN,
String, ?>>
+ bucketsBuilder, String inlongMetric, String inlongAudit) {
+ this.bucketCheckInterval = bucketCheckInterval;
+ this.bucketsBuilder = bucketsBuilder;
+ this.inlongMetric = inlongMetric;
+ this.inlongAudit = inlongAudit;
+ setChainingStrategy(ChainingStrategy.ALWAYS);
+ }
+
+ /**
+ * Notifies a partition created.
+ */
+ protected abstract void partitionCreated(String partition);
+
+ /**
+ * Notifies a partition become inactive. A partition becomes inactive
after all the records
+ * received so far have been committed.
+ */
+ protected abstract void partitionInactive(String partition);
+
+ /**
+ * Notifies a new file has been opened.
+ *
+ * <p>Note that this does not mean that the file has been created in the
file system. It is only
+ * created logically and the actual file will be generated after it is
committed.
+ */
+ protected abstract void onPartFileOpened(String partition, Path newPath);
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+ if (inlongMetric != null) {
+ String[] inLongMetricArray = inlongMetric.split(DELIMITER);
+ String groupId = inLongMetricArray[0];
+ String streamId = inLongMetricArray[1];
+ String nodeId = inLongMetricArray[2];
+ metricData = new SinkMetricData(
+ groupId, streamId, nodeId,
getRuntimeContext().getMetricGroup(), inlongAudit);
+ metricData.registerMetricsForDirtyBytes(new ThreadSafeCounter());
+ metricData.registerMetricsForDirtyRecords(new ThreadSafeCounter());
+ metricData.registerMetricsForNumBytesOut(new ThreadSafeCounter());
+ metricData.registerMetricsForNumRecordsOut(new
ThreadSafeCounter());
+ metricData.registerMetricsForNumBytesOutPerSecond();
+ metricData.registerMetricsForNumRecordsOutPerSecond();
+ }
+ }
+
+ /**
+ * Commit up to this checkpoint id.
+ */
+ protected void commitUpToCheckpoint(long checkpointId) throws Exception {
+ helper.commitUpToCheckpoint(checkpointId);
+ }
+
+ @Override
+ public void initializeState(StateInitializationContext context) throws
Exception {
+ super.initializeState(context);
+ buckets =
bucketsBuilder.createBuckets(getRuntimeContext().getIndexOfThisSubtask());
+
+ // Set listener before the initialization of Buckets.
+ buckets.setBucketLifeCycleListener(
+ new BucketLifeCycleListener<IN, String>() {
+
+ @Override
+ public void bucketCreated(Bucket<IN, String> bucket) {
+
AbstractStreamingWriter.this.partitionCreated(bucket.getBucketId());
+ }
+
+ @Override
+ public void bucketInactive(Bucket<IN, String> bucket) {
+
AbstractStreamingWriter.this.partitionInactive(bucket.getBucketId());
+ }
+ });
+
+
buckets.setFileLifeCycleListener(AbstractStreamingWriter.this::onPartFileOpened);
+
+ helper =
+ new StreamingFileSinkHelper<>(
+ buckets,
+ context.isRestored(),
+ context.getOperatorStateStore(),
+ getRuntimeContext().getProcessingTimeService(),
+ bucketCheckInterval);
+
+ currentWatermark = Long.MIN_VALUE;
+ }
+
+ @Override
+ public void snapshotState(StateSnapshotContext context) throws Exception {
+ super.snapshotState(context);
+ helper.snapshotState(context.getCheckpointId());
+ }
+
+ @Override
+ public void processWatermark(Watermark mark) throws Exception {
+ super.processWatermark(mark);
+ currentWatermark = mark.getTimestamp();
+ }
+
+ @Override
+ public void processElement(StreamRecord<IN> element) throws Exception {
+ helper.onElement(
+ element.getValue(),
+ getProcessingTimeService().getCurrentProcessingTime(),
+ element.hasTimestamp() ? element.getTimestamp() : null,
+ currentWatermark);
+ if (metricData != null) {
+ metricData.invokeWithEstimate(element.getValue());
+ }
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ super.notifyCheckpointComplete(checkpointId);
+ commitUpToCheckpoint(checkpointId);
+ }
+
+ @Override
+ public void endInput() throws Exception {
+ buckets.onProcessingTime(Long.MAX_VALUE);
+ helper.snapshotState(Long.MAX_VALUE);
+ output.emitWatermark(new Watermark(Long.MAX_VALUE));
+ commitUpToCheckpoint(Long.MAX_VALUE);
+ }
+
+ @Override
+ public void dispose() throws Exception {
+ super.dispose();
+ if (helper != null) {
+ helper.close();
+ }
+ }
+}
diff --git
a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/StreamingFileWriter.java
b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/StreamingFileWriter.java
new file mode 100644
index 000000000..bf0c15ca6
--- /dev/null
+++
b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/StreamingFileWriter.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.filesystem.stream;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.filesystem.stream.PartitionCommitInfo;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.TreeMap;
+
+/**
+ * Writer for emitting {@link PartitionCommitInfo} to downstream.
+ */
+public class StreamingFileWriter<IN> extends AbstractStreamingWriter<IN,
PartitionCommitInfo> {
+
+ private static final long serialVersionUID = 2L;
+
+ private transient Set<String> currentNewPartitions;
+ private transient TreeMap<Long, Set<String>> newPartitions;
+ private transient Set<String> committablePartitions;
+
+ public StreamingFileWriter(
+ long bucketCheckInterval,
+ StreamingFileSink.BucketsBuilder<
+ IN, String, ? extends StreamingFileSink.BucketsBuilder<IN,
String, ?>>
+ bucketsBuilder, String inlongMetric, String inlongAudit) {
+ super(bucketCheckInterval, bucketsBuilder, inlongMetric, inlongAudit);
+ }
+
+ @Override
+ public void initializeState(StateInitializationContext context) throws
Exception {
+ currentNewPartitions = new HashSet<>();
+ newPartitions = new TreeMap<>();
+ committablePartitions = new HashSet<>();
+ super.initializeState(context);
+ }
+
+ @Override
+ protected void partitionCreated(String partition) {
+ currentNewPartitions.add(partition);
+ }
+
+ @Override
+ protected void partitionInactive(String partition) {
+ committablePartitions.add(partition);
+ }
+
+ @Override
+ protected void onPartFileOpened(String s, Path newPath) {
+ }
+
+ @Override
+ public void snapshotState(StateSnapshotContext context) throws Exception {
+ super.snapshotState(context);
+ newPartitions.put(context.getCheckpointId(), new
HashSet<>(currentNewPartitions));
+ currentNewPartitions.clear();
+ }
+
+ @Override
+ protected void commitUpToCheckpoint(long checkpointId) throws Exception {
+ super.commitUpToCheckpoint(checkpointId);
+
+ NavigableMap<Long, Set<String>> headPartitions =
+ this.newPartitions.headMap(checkpointId, true);
+ Set<String> partitions = new HashSet<>(committablePartitions);
+ committablePartitions.clear();
+ headPartitions.values().forEach(partitions::addAll);
+ headPartitions.clear();
+
+ output.collect(
+ new StreamRecord<>(
+ new PartitionCommitInfo(
+ checkpointId,
+ getRuntimeContext().getIndexOfThisSubtask(),
+
getRuntimeContext().getNumberOfParallelSubtasks(),
+ new ArrayList<>(partitions))));
+ }
+}
diff --git
a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/StreamingSink.java
b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/StreamingSink.java
new file mode 100644
index 000000000..6f7e8fb8a
--- /dev/null
+++
b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/StreamingSink.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.filesystem.stream;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
+import
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.filesystem.FileSystemFactory;
+import org.apache.flink.table.filesystem.TableMetaStoreFactory;
+import org.apache.flink.table.filesystem.stream.PartitionCommitInfo;
+import org.apache.flink.table.filesystem.stream.PartitionCommitter;
+import org.apache.flink.table.filesystem.stream.compact.CompactBucketWriter;
+import org.apache.flink.table.filesystem.stream.compact.CompactCoordinator;
+import
org.apache.flink.table.filesystem.stream.compact.CompactMessages.CoordinatorInput;
+import
org.apache.flink.table.filesystem.stream.compact.CompactMessages.CoordinatorOutput;
+import org.apache.flink.table.filesystem.stream.compact.CompactOperator;
+import org.apache.flink.table.filesystem.stream.compact.CompactReader;
+import org.apache.flink.table.filesystem.stream.compact.CompactWriter;
+import org.apache.flink.util.function.SupplierWithException;
+import org.apache.inlong.sort.filesystem.stream.compact.CompactFileWriter;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+import static
org.apache.flink.table.filesystem.FileSystemOptions.SINK_PARTITION_COMMIT_POLICY_KIND;
+
+/**
+ * Helper for creating streaming file sink.
+ */
+public class StreamingSink {
+
+ private StreamingSink() {
+ }
+
+ /**
+ * Create a file writer by input stream. This is similar to {@link
StreamingFileSink}, in
+ * addition, it can emit {@link PartitionCommitInfo} to down stream.
+ */
+ public static <T> DataStream<PartitionCommitInfo> writer(
+ DataStream<T> inputStream,
+ long bucketCheckInterval,
+ StreamingFileSink.BucketsBuilder<
+ T, String, ? extends StreamingFileSink.BucketsBuilder<T,
String, ?>>
+ bucketsBuilder,
+ int parallelism, String inlongMetric, String inlongAudit) {
+ StreamingFileWriter<T> fileWriter =
+ new StreamingFileWriter<>(bucketCheckInterval, bucketsBuilder,
inlongMetric, inlongAudit);
+ return inputStream
+ .transform(
+ StreamingFileWriter.class.getSimpleName(),
+ TypeInformation.of(PartitionCommitInfo.class),
+ fileWriter)
+ .setParallelism(parallelism);
+ }
+
+ /**
+ * Create a file writer with compaction operators by input stream. In
addition, it can emit
+ * {@link PartitionCommitInfo} to down stream.
+ */
+ public static <T> DataStream<PartitionCommitInfo> compactionWriter(
+ DataStream<T> inputStream,
+ long bucketCheckInterval,
+ StreamingFileSink.BucketsBuilder<
+ T, String, ? extends StreamingFileSink.BucketsBuilder<T,
String, ?>>
+ bucketsBuilder,
+ FileSystemFactory fsFactory,
+ Path path,
+ CompactReader.Factory<T> readFactory,
+ long targetFileSize,
+ int parallelism, String inlongMetric, String inlongAudit) {
+ CompactFileWriter<T> writer = new
CompactFileWriter<>(bucketCheckInterval, bucketsBuilder, inlongMetric,
+ inlongAudit);
+
+ SupplierWithException<FileSystem, IOException> fsSupplier =
+ (SupplierWithException<FileSystem, IOException> & Serializable)
+ () -> fsFactory.create(path.toUri());
+
+ CompactCoordinator coordinator = new CompactCoordinator(fsSupplier,
targetFileSize);
+
+ SingleOutputStreamOperator<CoordinatorOutput> coordinatorOp =
+ inputStream
+ .transform(
+ "streaming-writer",
+ TypeInformation.of(CoordinatorInput.class),
+ writer)
+ .setParallelism(parallelism)
+ .transform(
+ "compact-coordinator",
+ TypeInformation.of(CoordinatorOutput.class),
+ coordinator)
+ .setParallelism(1)
+ .setMaxParallelism(1);
+
+ CompactWriter.Factory<T> writerFactory =
+ CompactBucketWriter.factory(
+ (SupplierWithException<BucketWriter<T, String>,
IOException> & Serializable)
+ bucketsBuilder::createBucketWriter);
+
+ CompactOperator<T> compacter =
+ new CompactOperator<>(fsSupplier, readFactory, writerFactory);
+
+ return coordinatorOp
+ .broadcast()
+ .transform(
+ "compact-operator",
+ TypeInformation.of(PartitionCommitInfo.class),
+ compacter)
+ .setParallelism(parallelism);
+ }
+
+ /**
+ * Create a sink from file writer. Decide whether to add the node to
commit partitions according
+ * to options.
+ */
+ public static DataStreamSink<?> sink(
+ DataStream<PartitionCommitInfo> writer,
+ Path locationPath,
+ ObjectIdentifier identifier,
+ List<String> partitionKeys,
+ TableMetaStoreFactory msFactory,
+ FileSystemFactory fsFactory,
+ Configuration options) {
+ DataStream<?> stream = writer;
+ if (partitionKeys.size() > 0 &&
options.contains(SINK_PARTITION_COMMIT_POLICY_KIND)) {
+ PartitionCommitter committer =
+ new PartitionCommitter(
+ locationPath, identifier, partitionKeys,
msFactory, fsFactory, options);
+ stream =
+ writer.transform(
+ PartitionCommitter.class.getSimpleName(),
Types.VOID, committer)
+ .setParallelism(1)
+ .setMaxParallelism(1);
+ }
+
+ return stream.addSink(new
DiscardingSink<>()).name("end").setParallelism(1);
+ }
+}
diff --git
a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/compact/CompactFileWriter.java
b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/compact/CompactFileWriter.java
new file mode 100644
index 000000000..06018b9ef
--- /dev/null
+++
b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/compact/CompactFileWriter.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.filesystem.stream.compact;
+
+import org.apache.flink.core.fs.Path;
+import
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import
org.apache.flink.table.filesystem.stream.compact.CompactMessages.CoordinatorInput;
+import
org.apache.flink.table.filesystem.stream.compact.CompactMessages.EndCheckpoint;
+import
org.apache.flink.table.filesystem.stream.compact.CompactMessages.InputFile;
+import org.apache.inlong.sort.filesystem.stream.AbstractStreamingWriter;
+
+/**
+ * Writer for emitting {@link InputFile} and {@link EndCheckpoint} to
downstream.
+ */
+public class CompactFileWriter<T>
+ extends AbstractStreamingWriter<T, CoordinatorInput> {
+
+ private static final long serialVersionUID = 1L;
+
+ public CompactFileWriter(
+ long bucketCheckInterval,
+ StreamingFileSink.BucketsBuilder<
+ T, String, ? extends StreamingFileSink.BucketsBuilder<T,
String, ?>>
+ bucketsBuilder, String inlongMetric, String inlongAudit) {
+ super(bucketCheckInterval, bucketsBuilder, inlongMetric, inlongAudit);
+ }
+
+ @Override
+ protected void partitionCreated(String partition) {
+ }
+
+ @Override
+ protected void partitionInactive(String partition) {
+ }
+
+ @Override
+ protected void onPartFileOpened(String partition, Path newPath) {
+ output.collect(new StreamRecord<>(new InputFile(partition, newPath)));
+ }
+
+ @Override
+ protected void commitUpToCheckpoint(long checkpointId) throws Exception {
+ super.commitUpToCheckpoint(checkpointId);
+ output.collect(
+ new StreamRecord<>(
+ new EndCheckpoint(
+ checkpointId,
+ getRuntimeContext().getIndexOfThisSubtask(),
+
getRuntimeContext().getNumberOfParallelSubtasks())));
+ }
+}
diff --git
a/inlong-sort/sort-connectors/filesystem/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
b/inlong-sort/sort-connectors/filesystem/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 000000000..142b40bfb
--- /dev/null
+++
b/inlong-sort/sort-connectors/filesystem/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.inlong.sort.filesystem.FileSystemTableFactory
\ No newline at end of file
diff --git a/inlong-sort/sort-connectors/pom.xml
b/inlong-sort/sort-connectors/pom.xml
index 14eb9f268..2b785a462 100644
--- a/inlong-sort/sort-connectors/pom.xml
+++ b/inlong-sort/sort-connectors/pom.xml
@@ -54,6 +54,7 @@
<module>elasticsearch-7</module>
<module>redis</module>
<module>tubemq</module>
+ <module>filesystem</module>
</modules>
<dependencies>
diff --git a/inlong-sort/sort-core/pom.xml b/inlong-sort/sort-core/pom.xml
index 925296280..94282be0c 100644
--- a/inlong-sort/sort-core/pom.xml
+++ b/inlong-sort/sort-core/pom.xml
@@ -211,6 +211,12 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-connector-filesystem</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FilesystemSqlParserTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FilesystemSqlParserTest.java
new file mode 100644
index 000000000..178b8370f
--- /dev/null
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FilesystemSqlParserTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.parser;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.inlong.sort.formats.common.IntFormatInfo;
+import org.apache.inlong.sort.formats.common.LongFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.parser.impl.FlinkSqlParser;
+import org.apache.inlong.sort.parser.result.ParseResult;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.GroupInfo;
+import org.apache.inlong.sort.protocol.StreamInfo;
+import org.apache.inlong.sort.protocol.node.Node;
+import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
+import org.apache.inlong.sort.protocol.node.load.FileSystemLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Test for {@link FileSystemLoadNode}
+ */
+public class FilesystemSqlParserTest {
+
+ /**
+ * build mysql extract node
+ *
+ * @return Mysql extract node
+ */
+ private MySqlExtractNode buildMySQLExtractNode() {
+ List<FieldInfo> fields = Arrays.asList(new FieldInfo("age", new
LongFormatInfo()),
+ new FieldInfo("name", new StringFormatInfo()));
+ Map<String, String> map = new HashMap<>();
+ return new MySqlExtractNode("1", "mysql_input", fields,
+ null, map, null,
+ Collections.singletonList("user"), "localhost", "root",
"inlong",
+ "test", null, null,
+ false, null);
+ }
+
+ /**
+ * build filesystem load node
+ *
+ * @return filesystem load node
+ */
+ private FileSystemLoadNode buildFileSystemLoadNode() {
+ List<FieldInfo> fields = Arrays.asList(
+ new FieldInfo("name", new StringFormatInfo()),
+ new FieldInfo("age", new LongFormatInfo()));
+ List<FieldRelation> relations = Arrays
+ .asList(new FieldRelation(new FieldInfo("name", new
StringFormatInfo()),
+ new FieldInfo("name", new StringFormatInfo())),
+ new FieldRelation(new FieldInfo("age", new
IntFormatInfo()),
+ new FieldInfo("age", new LongFormatInfo()))
+ );
+ Map<String, String> map = new HashMap<>();
+ map.put("sink.ignore.changelog", "true");
+ return new FileSystemLoadNode("2", "filesystem_node", fields,
relations, null, "hdfs://127.0.0.1:8020/", "csv",
+ 1, map, null, null);
+ }
+
+ /**
+ * build node relation
+ *
+ * @param inputs extract node
+ * @param outputs load node
+ * @return node relation
+ */
+ private NodeRelation buildNodeRelation(List<Node> inputs, List<Node>
outputs) {
+ List<String> inputIds =
inputs.stream().map(Node::getId).collect(Collectors.toList());
+ List<String> outputIds =
outputs.stream().map(Node::getId).collect(Collectors.toList());
+ return new NodeRelation(inputIds, outputIds);
+ }
+
+ /**
+ * Test flink sql task for extract is mysql {@link MySqlExtractNode} and
load is hbase {@link FileSystemLoadNode}
+ *
+ * @throws Exception The exception may be thrown when executing
+ */
+ @Test
+ public void testFlinkSqlParse() throws Exception {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ env.enableCheckpointing(10000);
+ env.disableOperatorChaining();
+ EnvironmentSettings settings = EnvironmentSettings
+ .newInstance()
+ .useBlinkPlanner()
+ .inStreamingMode()
+ .build();
+ StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
settings);
+ Node inputNode = buildMySQLExtractNode();
+ Node outputNode = buildFileSystemLoadNode();
+ StreamInfo streamInfo = new StreamInfo("1", Arrays.asList(inputNode,
outputNode),
+
Collections.singletonList(buildNodeRelation(Collections.singletonList(inputNode),
+ Collections.singletonList(outputNode))));
+ GroupInfo groupInfo = new GroupInfo("1",
Collections.singletonList(streamInfo));
+ FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv,
groupInfo);
+ ParseResult result = parser.parse();
+ Assert.assertTrue(result.tryExecute());
+ }
+}
diff --git a/licenses/inlong-sort-connectors/LICENSE
b/licenses/inlong-sort-connectors/LICENSE
index 1363253d1..38eed8f85 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -528,6 +528,16 @@
Source : org.apache.bahir:flink-connector-redis_2.11:1.1-SNAPSHOT (Please
note that the software have been modified.)
License : https://github.com/apache/bahir-flink/blob/master/LICENSE
+ 1.3.10
inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/FileSystemTableSink.java
+
inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/FileSystemTableFactory.java
+
inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/AbstractFileSystemTable.java
+
inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/StreamingSink.java
+
inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/StreamingFileWriter.java
+
inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java
+
inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/compact/CompactFileWriter.java
+ Source : flink-table-runtime-blink_2.11-13.2-rc2 2.2.1 (Please note that
the software have been modified.)
+ License : https://github.com/apache/flink/blob/release-1.13.2-rc2/LICENSE
+
=======================================================================
Apache InLong Subcomponents:
diff --git a/licenses/inlong-sort-connectors/NOTICE
b/licenses/inlong-sort-connectors/NOTICE
index 75d6375ee..410c311bb 100644
--- a/licenses/inlong-sort-connectors/NOTICE
+++ b/licenses/inlong-sort-connectors/NOTICE
@@ -2559,4 +2559,19 @@ Apache Parquet Hadoop NOTICE
The NOTICE content is too long and is included at notices/NOTICE-[project].txt
+// ------------------------------------------------------------------
+// NOTICE file corresponding to the section 4d of The Apache License,
+// Version 2.0, in this case for Apache Flink
+// ------------------------------------------------------------------
+
+Apache Flink
+Copyright 2006-2021 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+
+Flink : Table : Runtime Blink
+Copyright 2014-2021 The Apache Software Foundation
+