This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new f4f306657 [INLONG-5587][Sort] Add filesystem connector module for 
inlong metric and inlong audit (#5591)
f4f306657 is described below

commit f4f30665701d7833183bf7a66d1123e705f7da7e
Author: Xin Gong <[email protected]>
AuthorDate: Fri Aug 19 17:46:25 2022 +0800

    [INLONG-5587][Sort] Add filesystem connector module for inlong metric and 
inlong audit (#5591)
---
 .../src/main/assemblies/sort-connectors.xml        |   8 +
 .../protocol/node/load/FileSystemLoadNode.java     |   5 +-
 .../org/apache/inlong/sort/base/Constants.java     |   6 +
 inlong-sort/sort-connectors/filesystem/pom.xml     |  84 +++
 .../sort/filesystem/AbstractFileSystemTable.java   |  73 +++
 .../sort/filesystem/FileSystemTableFactory.java    | 217 +++++++
 .../sort/filesystem/FileSystemTableSink.java       | 685 +++++++++++++++++++++
 .../filesystem/stream/AbstractStreamingWriter.java | 206 +++++++
 .../filesystem/stream/StreamingFileWriter.java     | 101 +++
 .../sort/filesystem/stream/StreamingSink.java      | 163 +++++
 .../stream/compact/CompactFileWriter.java          |  68 ++
 .../org.apache.flink.table.factories.Factory       |  19 +
 inlong-sort/sort-connectors/pom.xml                |   1 +
 inlong-sort/sort-core/pom.xml                      |   6 +
 .../sort/parser/FilesystemSqlParserTest.java       | 129 ++++
 licenses/inlong-sort-connectors/LICENSE            |  10 +
 licenses/inlong-sort-connectors/NOTICE             |  15 +
 17 files changed, 1794 insertions(+), 2 deletions(-)

diff --git a/inlong-distribution/src/main/assemblies/sort-connectors.xml 
b/inlong-distribution/src/main/assemblies/sort-connectors.xml
index 674ea90a5..22bfd7526 100644
--- a/inlong-distribution/src/main/assemblies/sort-connectors.xml
+++ b/inlong-distribution/src/main/assemblies/sort-connectors.xml
@@ -147,6 +147,14 @@
             </includes>
             <fileMode>0644</fileMode>
         </fileSet>
+        <fileSet>
+            
<directory>../inlong-sort/sort-connectors/filesystem/target</directory>
+            <outputDirectory>inlong-sort/connectors</outputDirectory>
+            <includes>
+                
<include>sort-connector-filesystem-${project.version}.jar</include>
+            </includes>
+            <fileMode>0644</fileMode>
+        </fileSet>
         <!-- module's 3td-licenses, notices-->
         <fileSet>
             <directory>../licenses/inlong-sort-connectors</directory>
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/FileSystemLoadNode.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/FileSystemLoadNode.java
index 31224ab1f..43e309254 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/FileSystemLoadNode.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/FileSystemLoadNode.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCre
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
 import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.InlongMetric;
 import org.apache.inlong.sort.protocol.node.LoadNode;
 import org.apache.inlong.sort.protocol.transformation.FieldRelation;
 import org.apache.inlong.sort.protocol.transformation.FilterFunction;
@@ -39,7 +40,7 @@ import java.util.Map;
 @JsonTypeName("fileSystemLoad")
 @Data
 @NoArgsConstructor
-public class FileSystemLoadNode extends LoadNode implements Serializable {
+public class FileSystemLoadNode extends LoadNode implements InlongMetric, 
Serializable {
 
     private static final long serialVersionUID = -4836034838166667371L;
 
@@ -90,7 +91,7 @@ public class FileSystemLoadNode extends LoadNode implements 
Serializable {
     @Override
     public Map<String, String> tableOptions() {
         Map<String, String> map = super.tableOptions();
-        map.put("connector", "filesystem");
+        map.put("connector", "filesystem-inlong");
         map.put("path", path);
         map.put("format", format);
         if (null != partitionFields) {
diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
index 19b1a90f4..9dd124284 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
@@ -88,4 +88,10 @@ public final class Constants {
             .noDefaultValue()
             .withDescription("INLONG AUDIT HOST + '&' + PORT");
 
+    public static final ConfigOption<Boolean> IGNORE_ALL_CHANGELOG =
+            ConfigOptions.key("sink.ignore.changelog")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Regard upsert delete as insert kind.");
+
 }
diff --git a/inlong-sort/sort-connectors/filesystem/pom.xml 
b/inlong-sort/sort-connectors/filesystem/pom.xml
new file mode 100644
index 000000000..f14c30087
--- /dev/null
+++ b/inlong-sort/sort-connectors/filesystem/pom.xml
@@ -0,0 +1,84 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~   Licensed to the Apache Software Foundation (ASF) under one
+  ~   or more contributor license agreements.  See the NOTICE file
+  ~   distributed with this work for additional information
+  ~   regarding copyright ownership.  The ASF licenses this file
+  ~   to you under the Apache License, Version 2.0 (the
+  ~   "License"); you may not use this file except in compliance
+  ~   with the License.  You may obtain a copy of the License at
+  ~
+  ~       http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~   Unless required by applicable law or agreed to in writing, software
+  ~   distributed under the License is distributed on an "AS IS" BASIS,
+  ~   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~   See the License for the specific language governing permissions and
+  ~   limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <artifactId>sort-connectors</artifactId>
+        <groupId>org.apache.inlong</groupId>
+        <version>1.3.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>sort-connector-filesystem</artifactId>
+    <name>Apache InLong - Sort-connector-filesystem</name>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-connector-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>shade-flink</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <artifactSet>
+                                <includes>
+                                    <include>org.apache.inlong:*</include>
+                                </includes>
+                            </artifactSet>
+                            <filters>
+                                <filter>
+                                    
<artifact>org.apache.inlong:sort-connector-*</artifact>
+                                    <includes>
+                                        <include>org/apache/inlong/**</include>
+                                        
<include>META-INF/services/org.apache.flink.table.factories.Factory</include>
+                                    </includes>
+                                </filter>
+                            </filters>
+                            <relocations>
+                                <relocation>
+                                    
<pattern>org.apache.inlong.sort.base</pattern>
+                                    <shadedPattern>
+                                        
org.apache.inlong.sort.filesystem.shaded.org.apache.inlong.sort.base
+                                    </shadedPattern>
+                                </relocation>
+                            </relocations>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
\ No newline at end of file
diff --git 
a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/AbstractFileSystemTable.java
 
b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/AbstractFileSystemTable.java
new file mode 100644
index 000000000..a9baf0a80
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/AbstractFileSystemTable.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.filesystem;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DelegatingConfiguration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.utils.TableSchemaUtils;
+
+import java.util.List;
+
+import static 
org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_DEFAULT_NAME;
+import static org.apache.flink.table.filesystem.FileSystemOptions.PATH;
+
+/** Abstract File system table for providing some common methods. */
+abstract class AbstractFileSystemTable {
+
+    final DynamicTableFactory.Context context;
+    final ObjectIdentifier tableIdentifier;
+    final Configuration tableOptions;
+    final TableSchema schema;
+    final List<String> partitionKeys;
+    final Path path;
+    final String defaultPartName;
+
+    AbstractFileSystemTable(DynamicTableFactory.Context context) {
+        this.context = context;
+        this.tableIdentifier = context.getObjectIdentifier();
+        this.tableOptions = new Configuration();
+        
context.getCatalogTable().getOptions().forEach(tableOptions::setString);
+        this.schema = 
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
+        this.partitionKeys = context.getCatalogTable().getPartitionKeys();
+        this.path = new Path(tableOptions.get(PATH));
+        this.defaultPartName = tableOptions.get(PARTITION_DEFAULT_NAME);
+    }
+
+    ReadableConfig formatOptions(String identifier) {
+        return new DelegatingConfiguration(tableOptions, identifier + ".");
+    }
+
+    DataType getFormatDataType() {
+        TableSchema.Builder builder = TableSchema.builder();
+        schema.getTableColumns()
+                .forEach(
+                        column -> {
+                            if (!partitionKeys.contains(column.getName())) {
+                                builder.add(column);
+                            }
+                        });
+        return builder.build().toRowDataType();
+    }
+}
diff --git 
a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/FileSystemTableFactory.java
 
b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/FileSystemTableFactory.java
new file mode 100644
index 000000000..58c585ebc
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/FileSystemTableFactory.java
@@ -0,0 +1,217 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.inlong.sort.filesystem;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.BulkReaderFormatFactory;
+import org.apache.flink.table.factories.BulkWriterFormatFactory;
+import org.apache.flink.table.factories.DecodingFormatFactory;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.EncodingFormatFactory;
+import org.apache.flink.table.factories.Factory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.FileSystemFormatFactory;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+import org.apache.flink.table.factories.TableFactory;
+import org.apache.flink.table.filesystem.FileSystemOptions;
+import org.apache.flink.table.filesystem.FileSystemTableSource;
+
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static java.time.ZoneId.SHORT_IDS;
+import static org.apache.inlong.sort.base.Constants.IGNORE_ALL_CHANGELOG;
+import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
+
+/**
+ * File system {@link TableFactory}.
+ *
+ * <p>1.The partition information should be in the file system path, whether 
it's a temporary table
+ * or a catalog table. 2.Support insert into (append) and insert overwrite. 
3.Support static and
+ * dynamic partition inserting.
+ *
+ * copy from flink-table-runtime-blink:1.13.2-rc2
+ * Add inlong metric option and inlong audit option for computing metric
+ */
+public class FileSystemTableFactory implements DynamicTableSourceFactory, 
DynamicTableSinkFactory {
+
+    public static final String IDENTIFIER = "filesystem-inlong";
+
+    @Override
+    public String factoryIdentifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public DynamicTableSource createDynamicTableSource(Context context) {
+        FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
+        validate(helper);
+        return new FileSystemTableSource(
+                context,
+                discoverDecodingFormat(context, BulkReaderFormatFactory.class),
+                discoverDecodingFormat(context, 
DeserializationFormatFactory.class),
+                discoverFormatFactory(context));
+    }
+
+    @Override
+    public DynamicTableSink createDynamicTableSink(Context context) {
+        FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
+        validate(helper);
+        return new FileSystemTableSink(
+                context,
+                discoverDecodingFormat(context, BulkReaderFormatFactory.class),
+                discoverDecodingFormat(context, 
DeserializationFormatFactory.class),
+                discoverFormatFactory(context),
+                discoverEncodingFormat(context, BulkWriterFormatFactory.class),
+                discoverEncodingFormat(context, 
SerializationFormatFactory.class));
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(FileSystemOptions.PATH);
+        options.add(FactoryUtil.FORMAT);
+        return options;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(FileSystemOptions.PARTITION_DEFAULT_NAME);
+        options.add(FileSystemOptions.SINK_ROLLING_POLICY_FILE_SIZE);
+        options.add(FileSystemOptions.SINK_ROLLING_POLICY_ROLLOVER_INTERVAL);
+        options.add(FileSystemOptions.SINK_ROLLING_POLICY_CHECK_INTERVAL);
+        options.add(FileSystemOptions.SINK_SHUFFLE_BY_PARTITION);
+        options.add(FileSystemOptions.PARTITION_TIME_EXTRACTOR_KIND);
+        options.add(FileSystemOptions.PARTITION_TIME_EXTRACTOR_CLASS);
+        
options.add(FileSystemOptions.PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN);
+        options.add(FileSystemOptions.SINK_PARTITION_COMMIT_TRIGGER);
+        options.add(FileSystemOptions.SINK_PARTITION_COMMIT_DELAY);
+        
options.add(FileSystemOptions.SINK_PARTITION_COMMIT_WATERMARK_TIME_ZONE);
+        options.add(FileSystemOptions.SINK_PARTITION_COMMIT_POLICY_KIND);
+        options.add(FileSystemOptions.SINK_PARTITION_COMMIT_POLICY_CLASS);
+        options.add(FileSystemOptions.SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME);
+        options.add(FileSystemOptions.AUTO_COMPACTION);
+        options.add(FileSystemOptions.COMPACTION_FILE_SIZE);
+        options.add(FileSystemOptions.SINK_PARALLELISM);
+        options.add(INLONG_METRIC);
+        options.add(INLONG_AUDIT);
+        options.add(IGNORE_ALL_CHANGELOG);
+        return options;
+    }
+
+    private void validate(FactoryUtil.TableFactoryHelper helper) {
+        // Except format options, some formats like parquet and orc can not 
list all supported
+        // options.
+        helper.validateExcept(helper.getOptions().get(FactoryUtil.FORMAT) + 
".");
+
+        // validate time zone of watermark
+        String watermarkTimeZone =
+                helper.getOptions()
+                        
.get(FileSystemOptions.SINK_PARTITION_COMMIT_WATERMARK_TIME_ZONE);
+        if (watermarkTimeZone.startsWith("UTC+")
+                || watermarkTimeZone.startsWith("UTC-")
+                || SHORT_IDS.containsKey(watermarkTimeZone)) {
+            throw new ValidationException(
+                    String.format(
+                            "The supported watermark time zone is either a 
full name such as 'America/Los_Angeles',"
+                                    + " or a custom time zone id such as 
'GMT-08:00', but configured time zone is "
+                                    + "'%s'.",
+                            watermarkTimeZone));
+        }
+    }
+
+    private <I, F extends DecodingFormatFactory<I>> DecodingFormat<I> 
discoverDecodingFormat(
+            Context context, Class<F> formatFactoryClass) {
+        FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
+        if (formatFactoryExists(context, formatFactoryClass)) {
+            return helper.discoverDecodingFormat(formatFactoryClass, 
FactoryUtil.FORMAT);
+        } else {
+            return null;
+        }
+    }
+
+    private <I, F extends EncodingFormatFactory<I>> EncodingFormat<I> 
discoverEncodingFormat(
+            Context context, Class<F> formatFactoryClass) {
+        FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
+        if (formatFactoryExists(context, formatFactoryClass)) {
+            return helper.discoverEncodingFormat(formatFactoryClass, 
FactoryUtil.FORMAT);
+        } else {
+            return null;
+        }
+    }
+
+    private FileSystemFormatFactory discoverFormatFactory(Context context) {
+        if (formatFactoryExists(context, FileSystemFormatFactory.class)) {
+            Configuration options = 
Configuration.fromMap(context.getCatalogTable().getOptions());
+            String identifier = options.get(FactoryUtil.FORMAT);
+            return FactoryUtil.discoverFactory(
+                    Thread.currentThread().getContextClassLoader(),
+                    FileSystemFormatFactory.class,
+                    identifier);
+        } else {
+            return null;
+        }
+    }
+
+    /**
+     * Returns true if the format factory can be found using the given factory 
base class and
+     * identifier.
+     */
+    private boolean formatFactoryExists(Context context, Class<?> 
factoryClass) {
+        Configuration options = 
Configuration.fromMap(context.getCatalogTable().getOptions());
+        String identifier = options.get(FactoryUtil.FORMAT);
+        if (identifier == null) {
+            throw new ValidationException(
+                    String.format(
+                            "Table options do not contain an option key '%s' 
for discovering a format.",
+                            FactoryUtil.FORMAT.key()));
+        }
+
+        final List<Factory> factories = new LinkedList<>();
+        ServiceLoader.load(Factory.class, context.getClassLoader())
+                .iterator()
+                .forEachRemaining(factories::add);
+
+        final List<Factory> foundFactories =
+                factories.stream()
+                        .filter(f -> 
factoryClass.isAssignableFrom(f.getClass()))
+                        .collect(Collectors.toList());
+
+        final List<Factory> matchingFactories =
+                foundFactories.stream()
+                        .filter(f -> f.factoryIdentifier().equals(identifier))
+                        .collect(Collectors.toList());
+
+        return !matchingFactories.isEmpty();
+    }
+}
diff --git 
a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/FileSystemTableSink.java
 
b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/FileSystemTableSink.java
new file mode 100644
index 000000000..60bcf9332
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/FileSystemTableSink.java
@@ -0,0 +1,685 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.inlong.sort.filesystem;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
+import org.apache.flink.streaming.api.functions.sink.filesystem.PartFileInfo;
+import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.BucketsBuilder;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.CheckpointRollingPolicy;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
+import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.FileSystemFormatFactory;
+import org.apache.flink.table.filesystem.DeserializationSchemaAdapter;
+import org.apache.flink.table.filesystem.EmptyMetaStoreFactory;
+import org.apache.flink.table.filesystem.FileSystemFactory;
+import org.apache.flink.table.filesystem.FileSystemOptions;
+import org.apache.flink.table.filesystem.FileSystemOutputFormat;
+import org.apache.flink.table.filesystem.OutputFormatFactory;
+import org.apache.flink.table.filesystem.PartitionComputer;
+import org.apache.flink.table.filesystem.RowDataPartitionComputer;
+import org.apache.flink.table.filesystem.SerializationSchemaAdapter;
+import org.apache.flink.table.filesystem.stream.PartitionCommitInfo;
+import org.apache.flink.table.filesystem.stream.compact.CompactBulkReader;
+import org.apache.flink.table.filesystem.stream.compact.CompactReader;
+import 
org.apache.flink.table.filesystem.stream.compact.FileInputFormatCompactReader;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.utils.PartitionPathUtils;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Preconditions;
+import org.apache.inlong.sort.base.util.ValidateMetricOptionUtils;
+import org.apache.inlong.sort.filesystem.stream.StreamingSink;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import static 
org.apache.flink.table.filesystem.FileSystemOptions.SINK_ROLLING_POLICY_CHECK_INTERVAL;
+import static 
org.apache.flink.table.filesystem.FileSystemOptions.SINK_ROLLING_POLICY_FILE_SIZE;
+import static 
org.apache.flink.table.filesystem.FileSystemOptions.SINK_ROLLING_POLICY_ROLLOVER_INTERVAL;
+import static 
org.apache.flink.table.filesystem.stream.compact.CompactOperator.convertToUncompacted;
+import static org.apache.inlong.sort.base.Constants.IGNORE_ALL_CHANGELOG;
+import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
+
+/**
+ * File system {@link DynamicTableSink}.
+ */
+public class FileSystemTableSink extends AbstractFileSystemTable
+        implements DynamicTableSink, SupportsPartitioning, SupportsOverwrite {
+
+    // For compaction reading
+    @Nullable
+    private final DecodingFormat<BulkFormat<RowData, FileSourceSplit>> 
bulkReaderFormat;
+    @Nullable
+    private final DecodingFormat<DeserializationSchema<RowData>> 
deserializationFormat;
+    @Nullable
+    private final FileSystemFormatFactory formatFactory;
+
+    // For Writing
+    @Nullable
+    private final EncodingFormat<BulkWriter.Factory<RowData>> bulkWriterFormat;
+    @Nullable
+    private final EncodingFormat<SerializationSchema<RowData>> 
serializationFormat;
+
+    private boolean overwrite = false;
+    private boolean dynamicGrouping = false;
+    private LinkedHashMap<String, String> staticPartitions = new 
LinkedHashMap<>();
+
+    @Nullable
+    private Integer configuredParallelism;
+
+    private String inlongMetric;
+    private String inlongAudit;
+
+    FileSystemTableSink(
+            DynamicTableFactory.Context context,
+            @Nullable DecodingFormat<BulkFormat<RowData, FileSourceSplit>> 
bulkReaderFormat,
+            @Nullable DecodingFormat<DeserializationSchema<RowData>> 
deserializationFormat,
+            @Nullable FileSystemFormatFactory formatFactory,
+            @Nullable EncodingFormat<BulkWriter.Factory<RowData>> 
bulkWriterFormat,
+            @Nullable EncodingFormat<SerializationSchema<RowData>> 
serializationFormat) {
+        super(context);
+        this.bulkReaderFormat = bulkReaderFormat;
+        this.deserializationFormat = deserializationFormat;
+        this.formatFactory = formatFactory;
+        if (Stream.of(bulkWriterFormat, serializationFormat, formatFactory)
+                .allMatch(Objects::isNull)) {
+            Configuration options = 
Configuration.fromMap(context.getCatalogTable().getOptions());
+            String identifier = options.get(FactoryUtil.FORMAT);
+            throw new ValidationException(
+                    String.format(
+                            "Could not find any format factory for identifier 
'%s' in the classpath.",
+                            identifier));
+        }
+        this.bulkWriterFormat = bulkWriterFormat;
+        this.serializationFormat = serializationFormat;
+        this.configuredParallelism = 
tableOptions.get(FileSystemOptions.SINK_PARALLELISM);
+        this.inlongMetric = tableOptions.get(INLONG_METRIC);
+        this.inlongAudit = tableOptions.get(INLONG_AUDIT);
+        
ValidateMetricOptionUtils.validateInlongMetricIfSetInlongAudit(inlongMetric, 
inlongAudit);
+    }
+
+    @Override
+    public SinkRuntimeProvider getSinkRuntimeProvider(Context sinkContext) {
+        return (DataStreamSinkProvider) dataStream -> consume(dataStream, 
sinkContext);
+    }
+
+    private DataStreamSink<?> consume(DataStream<RowData> dataStream, Context 
sinkContext) {
+        final int inputParallelism = dataStream.getParallelism();
+        final int parallelism = 
Optional.ofNullable(configuredParallelism).orElse(inputParallelism);
+
+        if (sinkContext.isBounded()) {
+            return createBatchSink(dataStream, sinkContext, parallelism);
+        } else {
+            if (overwrite) {
+                throw new IllegalStateException("Streaming mode not support 
overwrite.");
+            }
+
+            return createStreamingSink(dataStream, sinkContext, parallelism);
+        }
+    }
+
+    private RowDataPartitionComputer partitionComputer() {
+        return new RowDataPartitionComputer(
+                defaultPartName,
+                schema.getFieldNames(),
+                schema.getFieldDataTypes(),
+                partitionKeys.toArray(new String[0]));
+    }
+
+    private DataStreamSink<RowData> createBatchSink(
+            DataStream<RowData> inputStream, Context sinkContext, final int 
parallelism) {
+        FileSystemOutputFormat.Builder<RowData> builder = new 
FileSystemOutputFormat.Builder<>();
+        builder.setPartitionComputer(partitionComputer());
+        builder.setDynamicGrouped(dynamicGrouping);
+        builder.setPartitionColumns(partitionKeys.toArray(new String[0]));
+        builder.setFormatFactory(createOutputFormatFactory(sinkContext));
+        builder.setMetaStoreFactory(new EmptyMetaStoreFactory(path));
+        builder.setOverwrite(overwrite);
+        builder.setStaticPartitions(staticPartitions);
+        builder.setTempPath(toStagingPath());
+        builder.setOutputFileConfig(
+                OutputFileConfig.builder()
+                        .withPartPrefix("part-" + UUID.randomUUID().toString())
+                        .build());
+        return inputStream
+                .writeUsingOutputFormat(builder.build())
+                .setParallelism(parallelism)
+                .name("Filesystem");
+    }
+
+    private DataStreamSink<?> createStreamingSink(
+            DataStream<RowData> dataStream, Context sinkContext, final int 
parallelism) {
+        FileSystemFactory fsFactory = FileSystem::get;
+        RowDataPartitionComputer computer = partitionComputer();
+
+        boolean autoCompaction = 
tableOptions.getBoolean(FileSystemOptions.AUTO_COMPACTION);
+        Object writer = createWriter(sinkContext);
+        boolean isEncoder = writer instanceof Encoder;
+        TableBucketAssigner assigner = new TableBucketAssigner(computer);
+        TableRollingPolicy rollingPolicy =
+                new TableRollingPolicy(
+                        !isEncoder || autoCompaction,
+                        
tableOptions.get(SINK_ROLLING_POLICY_FILE_SIZE).getBytes(),
+                        
tableOptions.get(SINK_ROLLING_POLICY_ROLLOVER_INTERVAL).toMillis());
+
+        String randomPrefix = "part-" + UUID.randomUUID().toString();
+        OutputFileConfig.OutputFileConfigBuilder fileNamingBuilder = 
OutputFileConfig.builder();
+        fileNamingBuilder =
+                autoCompaction
+                        ? 
fileNamingBuilder.withPartPrefix(convertToUncompacted(randomPrefix))
+                        : fileNamingBuilder.withPartPrefix(randomPrefix);
+        OutputFileConfig fileNamingConfig = fileNamingBuilder.build();
+
+        BucketsBuilder<RowData, String, ? extends BucketsBuilder<RowData, ?, 
?>> bucketsBuilder;
+        if (isEncoder) {
+            //noinspection unchecked
+            bucketsBuilder =
+                    StreamingFileSink.forRowFormat(
+                                    path,
+                                    new ProjectionEncoder((Encoder<RowData>) 
writer, computer))
+                            .withBucketAssigner(assigner)
+                            .withOutputFileConfig(fileNamingConfig)
+                            .withRollingPolicy(rollingPolicy);
+        } else {
+            //noinspection unchecked
+            bucketsBuilder =
+                    StreamingFileSink.forBulkFormat(
+                                    path,
+                                    new ProjectionBulkFactory(
+                                            (BulkWriter.Factory<RowData>) 
writer, computer))
+                            .withBucketAssigner(assigner)
+                            .withOutputFileConfig(fileNamingConfig)
+                            .withRollingPolicy(rollingPolicy);
+        }
+
+        long bucketCheckInterval = 
tableOptions.get(SINK_ROLLING_POLICY_CHECK_INTERVAL).toMillis();
+
+        DataStream<PartitionCommitInfo> writerStream;
+        if (autoCompaction) {
+            long compactionSize =
+                    tableOptions
+                            
.getOptional(FileSystemOptions.COMPACTION_FILE_SIZE)
+                            
.orElse(tableOptions.get(SINK_ROLLING_POLICY_FILE_SIZE))
+                            .getBytes();
+
+            CompactReader.Factory<RowData> reader =
+                    createCompactReaderFactory(sinkContext)
+                            .orElseThrow(
+                                    () ->
+                                            new TableException(
+                                                    "Please implement 
available reader for compaction:"
+                                                            + " BulkFormat, 
FileInputFormat."));
+
+            writerStream =
+                    StreamingSink.compactionWriter(
+                            dataStream,
+                            bucketCheckInterval,
+                            bucketsBuilder,
+                            fsFactory,
+                            path,
+                            reader,
+                            compactionSize,
+                            parallelism,
+                            inlongMetric,
+                            inlongAudit);
+        } else {
+            writerStream =
+                    StreamingSink.writer(
+                            dataStream, bucketCheckInterval, bucketsBuilder, 
parallelism, inlongMetric, inlongAudit);
+        }
+
+        return StreamingSink.sink(
+                writerStream,
+                path,
+                tableIdentifier,
+                partitionKeys,
+                new EmptyMetaStoreFactory(path),
+                fsFactory,
+                tableOptions);
+    }
+
+    private Optional<CompactReader.Factory<RowData>> 
createCompactReaderFactory(Context context) {
+        DataType producedDataType = schema.toRowDataType();
+        if (bulkReaderFormat != null) {
+            BulkFormat<RowData, FileSourceSplit> format =
+                    bulkReaderFormat.createRuntimeDecoder(
+                            createSourceContext(context), producedDataType);
+            return Optional.of(CompactBulkReader.factory(format));
+        } else if (formatFactory != null) {
+            InputFormat<RowData, ?> format = 
formatFactory.createReader(createReaderContext());
+            if (format instanceof FileInputFormat) {
+                //noinspection unchecked
+                return Optional.of(
+                        
FileInputFormatCompactReader.factory((FileInputFormat<RowData>) format));
+            }
+        } else if (deserializationFormat != null) {
+            // NOTE, we need pass full format types to deserializationFormat
+            DeserializationSchema<RowData> decoder =
+                    deserializationFormat.createRuntimeDecoder(
+                            createSourceContext(context), getFormatDataType());
+            int[] projectedFields = IntStream.range(0, 
schema.getFieldCount()).toArray();
+            DeserializationSchemaAdapter format =
+                    new DeserializationSchemaAdapter(
+                            decoder, schema, projectedFields, partitionKeys, 
defaultPartName);
+            return Optional.of(CompactBulkReader.factory(format));
+        }
+        return Optional.empty();
+    }
+
+    private DynamicTableSource.Context createSourceContext(Context context) {
+        return new DynamicTableSource.Context() {
+            @Override
+            public <T> TypeInformation<T> createTypeInformation(DataType 
producedDataType) {
+                return context.createTypeInformation(producedDataType);
+            }
+
+            @Override
+            public DynamicTableSource.DataStructureConverter 
createDataStructureConverter(
+                    DataType producedDataType) {
+                throw new TableException("Compaction reader not support 
DataStructure converter.");
+            }
+        };
+    }
+
+    private FileSystemFormatFactory.ReaderContext createReaderContext() {
+        return new FileSystemFormatFactory.ReaderContext() {
+            @Override
+            public TableSchema getSchema() {
+                return schema;
+            }
+
+            @Override
+            public ReadableConfig getFormatOptions() {
+                return formatOptions(formatFactory.factoryIdentifier());
+            }
+
+            @Override
+            public List<String> getPartitionKeys() {
+                return partitionKeys;
+            }
+
+            @Override
+            public String getDefaultPartName() {
+                return defaultPartName;
+            }
+
+            @Override
+            public Path[] getPaths() {
+                return new Path[]{path};
+            }
+
+            @Override
+            public int[] getProjectFields() {
+                return IntStream.range(0, schema.getFieldCount()).toArray();
+            }
+
+            @Override
+            public long getPushedDownLimit() {
+                return Long.MAX_VALUE;
+            }
+
+            @Override
+            public List<ResolvedExpression> getPushedDownFilters() {
+                return Collections.emptyList();
+            }
+        };
+    }
+
+    private Path toStagingPath() {
+        Path stagingDir = new Path(path, ".staging_" + 
System.currentTimeMillis());
+        try {
+            FileSystem fs = stagingDir.getFileSystem();
+            Preconditions.checkState(
+                    fs.exists(stagingDir) || fs.mkdirs(stagingDir),
+                    "Failed to create staging dir " + stagingDir);
+            return stagingDir;
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private OutputFormatFactory<RowData> createOutputFormatFactory(Context 
sinkContext) {
+        Object writer = createWriter(sinkContext);
+        return writer instanceof Encoder
+                ? path -> createEncoderOutputFormat((Encoder<RowData>) writer, 
path)
+                : path -> 
createBulkWriterOutputFormat((BulkWriter.Factory<RowData>) writer, path);
+    }
+
+    private Object createWriter(Context sinkContext) {
+        if (bulkWriterFormat != null) {
+            return bulkWriterFormat.createRuntimeEncoder(sinkContext, 
getFormatDataType());
+        } else if (serializationFormat != null) {
+            return new SerializationSchemaAdapter(
+                    serializationFormat.createRuntimeEncoder(sinkContext, 
getFormatDataType()));
+        } else {
+            throw new TableException("Can not find format factory.");
+        }
+    }
+
+    private void checkConfiguredParallelismAllowed(ChangelogMode 
requestChangelogMode) {
+        final Integer parallelism = this.configuredParallelism;
+        if (parallelism == null) {
+            return;
+        }
+        if (!requestChangelogMode.containsOnly(RowKind.INSERT)) {
+            throw new ValidationException(
+                    String.format(
+                            "Currently, filesystem sink doesn't support 
setting parallelism (%d) by '%s' "
+                                    + "when the input stream is not INSERT 
only. The row kinds of input stream are "
+                                    + "[%s]",
+                            parallelism,
+                            FileSystemOptions.SINK_PARALLELISM.key(),
+                            requestChangelogMode.getContainedKinds().stream()
+                                    .map(RowKind::shortString)
+                                    .collect(Collectors.joining(","))));
+        }
+    }
+
+    private static OutputFormat<RowData> createBulkWriterOutputFormat(
+            BulkWriter.Factory<RowData> factory, Path path) {
+        return new OutputFormat<RowData>() {
+
+            private static final long serialVersionUID = 1L;
+
+            private transient BulkWriter<RowData> writer;
+
+            @Override
+            public void configure(Configuration parameters) {
+            }
+
+            @Override
+            public void open(int taskNumber, int numTasks) throws IOException {
+                this.writer =
+                        factory.create(
+                                path.getFileSystem().create(path, 
FileSystem.WriteMode.OVERWRITE));
+            }
+
+            @Override
+            public void writeRecord(RowData record) throws IOException {
+                writer.addElement(record);
+            }
+
+            @Override
+            public void close() throws IOException {
+                writer.flush();
+                writer.finish();
+            }
+        };
+    }
+
+    private static OutputFormat<RowData> createEncoderOutputFormat(
+            Encoder<RowData> encoder, Path path) {
+        return new OutputFormat<RowData>() {
+
+            private static final long serialVersionUID = 1L;
+
+            private transient FSDataOutputStream output;
+
+            @Override
+            public void configure(Configuration parameters) {
+            }
+
+            @Override
+            public void open(int taskNumber, int numTasks) throws IOException {
+                this.output = path.getFileSystem().create(path, 
FileSystem.WriteMode.OVERWRITE);
+            }
+
+            @Override
+            public void writeRecord(RowData record) throws IOException {
+                encoder.encode(record, output);
+            }
+
+            @Override
+            public void close() throws IOException {
+                this.output.flush();
+                this.output.close();
+            }
+        };
+    }
+
+    private LinkedHashMap<String, String> toPartialLinkedPartSpec(Map<String, 
String> part) {
+        LinkedHashMap<String, String> partSpec = new LinkedHashMap<>();
+        for (String partitionKey : partitionKeys) {
+            if (part.containsKey(partitionKey)) {
+                partSpec.put(partitionKey, part.get(partitionKey));
+            }
+        }
+        return partSpec;
+    }
+
+    @Override
+    public boolean requiresPartitionGrouping(boolean supportsGrouping) {
+        this.dynamicGrouping = supportsGrouping;
+        return dynamicGrouping;
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+        checkConfiguredParallelismAllowed(requestedMode);
+        boolean ignoreChangelog = tableOptions.get(IGNORE_ALL_CHANGELOG);
+        if (ignoreChangelog) {
+            return ChangelogMode.all();
+        }
+        if (bulkWriterFormat != null) {
+            return bulkWriterFormat.getChangelogMode();
+        } else if (serializationFormat != null) {
+            return serializationFormat.getChangelogMode();
+        } else {
+            throw new TableException("Can not find format factory.");
+        }
+    }
+
+    @Override
+    public DynamicTableSink copy() {
+        FileSystemTableSink sink =
+                new FileSystemTableSink(
+                        context,
+                        bulkReaderFormat,
+                        deserializationFormat,
+                        formatFactory,
+                        bulkWriterFormat,
+                        serializationFormat);
+        sink.overwrite = overwrite;
+        sink.dynamicGrouping = dynamicGrouping;
+        sink.staticPartitions = staticPartitions;
+        return sink;
+    }
+
+    @Override
+    public String asSummaryString() {
+        return "Filesystem";
+    }
+
+    @Override
+    public void applyOverwrite(boolean overwrite) {
+        this.overwrite = overwrite;
+    }
+
+    @Override
+    public void applyStaticPartition(Map<String, String> partition) {
+        this.staticPartitions = toPartialLinkedPartSpec(partition);
+    }
+
+    /**
+     * Table bucket assigner, wrap {@link PartitionComputer}.
+     */
+    public static class TableBucketAssigner implements BucketAssigner<RowData, 
String> {
+
+        private final PartitionComputer<RowData> computer;
+
+        public TableBucketAssigner(PartitionComputer<RowData> computer) {
+            this.computer = computer;
+        }
+
+        @Override
+        public String getBucketId(RowData element, Context context) {
+            try {
+                return PartitionPathUtils.generatePartitionPath(
+                        computer.generatePartValues(element));
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        @Override
+        public SimpleVersionedSerializer<String> getSerializer() {
+            return SimpleVersionedStringSerializer.INSTANCE;
+        }
+    }
+
+    /**
+     * Table {@link RollingPolicy}, it extends {@link CheckpointRollingPolicy} 
for bulk writers.
+     */
+    public static class TableRollingPolicy extends 
CheckpointRollingPolicy<RowData, String> {
+
+        private final boolean rollOnCheckpoint;
+        private final long rollingFileSize;
+        private final long rollingTimeInterval;
+
+        public TableRollingPolicy(
+                boolean rollOnCheckpoint, long rollingFileSize, long 
rollingTimeInterval) {
+            this.rollOnCheckpoint = rollOnCheckpoint;
+            Preconditions.checkArgument(rollingFileSize > 0L);
+            Preconditions.checkArgument(rollingTimeInterval > 0L);
+            this.rollingFileSize = rollingFileSize;
+            this.rollingTimeInterval = rollingTimeInterval;
+        }
+
+        @Override
+        public boolean shouldRollOnCheckpoint(PartFileInfo<String> 
partFileState) {
+            try {
+                return rollOnCheckpoint || partFileState.getSize() > 
rollingFileSize;
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        @Override
+        public boolean shouldRollOnEvent(PartFileInfo<String> partFileState, 
RowData element)
+                throws IOException {
+            return partFileState.getSize() > rollingFileSize;
+        }
+
+        @Override
+        public boolean shouldRollOnProcessingTime(
+                PartFileInfo<String> partFileState, long currentTime) {
+            return currentTime - partFileState.getCreationTime() >= 
rollingTimeInterval;
+        }
+    }
+
+    private static class ProjectionEncoder implements Encoder<RowData> {
+
+        private final Encoder<RowData> encoder;
+        private final RowDataPartitionComputer computer;
+
+        private ProjectionEncoder(Encoder<RowData> encoder, 
RowDataPartitionComputer computer) {
+            this.encoder = encoder;
+            this.computer = computer;
+        }
+
+        @Override
+        public void encode(RowData element, OutputStream stream) throws 
IOException {
+            encoder.encode(computer.projectColumnsToWrite(element), stream);
+        }
+    }
+
+    /**
+     * Project row to non-partition fields.
+     */
+    public static class ProjectionBulkFactory implements 
BulkWriter.Factory<RowData> {
+
+        private final BulkWriter.Factory<RowData> factory;
+        private final RowDataPartitionComputer computer;
+
+        public ProjectionBulkFactory(
+                BulkWriter.Factory<RowData> factory, RowDataPartitionComputer 
computer) {
+            this.factory = factory;
+            this.computer = computer;
+        }
+
+        @Override
+        public BulkWriter<RowData> create(FSDataOutputStream out) throws 
IOException {
+            BulkWriter<RowData> writer = factory.create(out);
+            return new BulkWriter<RowData>() {
+
+                @Override
+                public void addElement(RowData element) throws IOException {
+                    writer.addElement(computer.projectColumnsToWrite(element));
+                }
+
+                @Override
+                public void flush() throws IOException {
+                    writer.flush();
+                }
+
+                @Override
+                public void finish() throws IOException {
+                    writer.finish();
+                }
+            };
+        }
+    }
+}
diff --git 
a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java
 
b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java
new file mode 100644
index 000000000..47516a270
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.filesystem.stream;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.functions.sink.filesystem.Bucket;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.BucketLifeCycleListener;
+import org.apache.flink.streaming.api.functions.sink.filesystem.Buckets;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.apache.inlong.sort.base.metric.ThreadSafeCounter;
+
+import static org.apache.inlong.sort.base.Constants.DELIMITER;
+
+/**
+ * Operator for file system sink. It is a operator version of {@link 
StreamingFileSink}. It can send
+ * file and bucket information to downstream.
+ */
+public abstract class AbstractStreamingWriter<IN, OUT> extends 
AbstractStreamOperator<OUT>
+        implements OneInputStreamOperator<IN, OUT>, BoundedOneInput {
+
+    private static final long serialVersionUID = 1L;
+
+    // ------------------------ configuration fields --------------------------
+
+    private final long bucketCheckInterval;
+
+    private final StreamingFileSink.BucketsBuilder<
+            IN, String, ? extends StreamingFileSink.BucketsBuilder<IN, String, 
?>>
+            bucketsBuilder;
+
+    private String inlongMetric;
+
+    private String inlongAudit;
+
+    // --------------------------- runtime fields -----------------------------
+
+    private transient Buckets<IN, String> buckets;
+
+    private transient StreamingFileSinkHelper<IN> helper;
+
+    private transient long currentWatermark;
+
+    private SinkMetricData metricData;
+
+    public AbstractStreamingWriter(
+            long bucketCheckInterval,
+            StreamingFileSink.BucketsBuilder<
+                    IN, String, ? extends StreamingFileSink.BucketsBuilder<IN, 
String, ?>>
+                    bucketsBuilder, String inlongMetric, String inlongAudit) {
+        this.bucketCheckInterval = bucketCheckInterval;
+        this.bucketsBuilder = bucketsBuilder;
+        this.inlongMetric = inlongMetric;
+        this.inlongAudit = inlongAudit;
+        setChainingStrategy(ChainingStrategy.ALWAYS);
+    }
+
+    /**
+     * Notifies a partition created.
+     */
+    protected abstract void partitionCreated(String partition);
+
+    /**
+     * Notifies a partition become inactive. A partition becomes inactive 
after all the records
+     * received so far have been committed.
+     */
+    protected abstract void partitionInactive(String partition);
+
+    /**
+     * Notifies a new file has been opened.
+     *
+     * <p>Note that this does not mean that the file has been created in the 
file system. It is only
+     * created logically and the actual file will be generated after it is 
committed.
+     */
+    protected abstract void onPartFileOpened(String partition, Path newPath);
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        if (inlongMetric != null) {
+            String[] inLongMetricArray = inlongMetric.split(DELIMITER);
+            String groupId = inLongMetricArray[0];
+            String streamId = inLongMetricArray[1];
+            String nodeId = inLongMetricArray[2];
+            metricData = new SinkMetricData(
+                    groupId, streamId, nodeId, 
getRuntimeContext().getMetricGroup(), inlongAudit);
+            metricData.registerMetricsForDirtyBytes(new ThreadSafeCounter());
+            metricData.registerMetricsForDirtyRecords(new ThreadSafeCounter());
+            metricData.registerMetricsForNumBytesOut(new ThreadSafeCounter());
+            metricData.registerMetricsForNumRecordsOut(new 
ThreadSafeCounter());
+            metricData.registerMetricsForNumBytesOutPerSecond();
+            metricData.registerMetricsForNumRecordsOutPerSecond();
+        }
+    }
+
+    /**
+     * Commit up to this checkpoint id.
+     */
+    protected void commitUpToCheckpoint(long checkpointId) throws Exception {
+        helper.commitUpToCheckpoint(checkpointId);
+    }
+
+    @Override
+    public void initializeState(StateInitializationContext context) throws 
Exception {
+        super.initializeState(context);
+        buckets = 
bucketsBuilder.createBuckets(getRuntimeContext().getIndexOfThisSubtask());
+
+        // Set listener before the initialization of Buckets.
+        buckets.setBucketLifeCycleListener(
+                new BucketLifeCycleListener<IN, String>() {
+
+                    @Override
+                    public void bucketCreated(Bucket<IN, String> bucket) {
+                        
AbstractStreamingWriter.this.partitionCreated(bucket.getBucketId());
+                    }
+
+                    @Override
+                    public void bucketInactive(Bucket<IN, String> bucket) {
+                        
AbstractStreamingWriter.this.partitionInactive(bucket.getBucketId());
+                    }
+                });
+
+        
buckets.setFileLifeCycleListener(AbstractStreamingWriter.this::onPartFileOpened);
+
+        helper =
+                new StreamingFileSinkHelper<>(
+                        buckets,
+                        context.isRestored(),
+                        context.getOperatorStateStore(),
+                        getRuntimeContext().getProcessingTimeService(),
+                        bucketCheckInterval);
+
+        currentWatermark = Long.MIN_VALUE;
+    }
+
+    @Override
+    public void snapshotState(StateSnapshotContext context) throws Exception {
+        super.snapshotState(context);
+        helper.snapshotState(context.getCheckpointId());
+    }
+
+    @Override
+    public void processWatermark(Watermark mark) throws Exception {
+        super.processWatermark(mark);
+        currentWatermark = mark.getTimestamp();
+    }
+
+    @Override
+    public void processElement(StreamRecord<IN> element) throws Exception {
+        helper.onElement(
+                element.getValue(),
+                getProcessingTimeService().getCurrentProcessingTime(),
+                element.hasTimestamp() ? element.getTimestamp() : null,
+                currentWatermark);
+        if (metricData != null) {
+            metricData.invokeWithEstimate(element.getValue());
+        }
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+        super.notifyCheckpointComplete(checkpointId);
+        commitUpToCheckpoint(checkpointId);
+    }
+
+    @Override
+    public void endInput() throws Exception {
+        buckets.onProcessingTime(Long.MAX_VALUE);
+        helper.snapshotState(Long.MAX_VALUE);
+        output.emitWatermark(new Watermark(Long.MAX_VALUE));
+        commitUpToCheckpoint(Long.MAX_VALUE);
+    }
+
+    @Override
+    public void dispose() throws Exception {
+        super.dispose();
+        if (helper != null) {
+            helper.close();
+        }
+    }
+}
diff --git 
a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/StreamingFileWriter.java
 
b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/StreamingFileWriter.java
new file mode 100644
index 000000000..bf0c15ca6
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/StreamingFileWriter.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.filesystem.stream;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.filesystem.stream.PartitionCommitInfo;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.TreeMap;
+
+/**
+ * Writer for emitting {@link PartitionCommitInfo} to downstream.
+ */
+public class StreamingFileWriter<IN> extends AbstractStreamingWriter<IN, 
PartitionCommitInfo> {
+
+    private static final long serialVersionUID = 2L;
+
+    private transient Set<String> currentNewPartitions;
+    private transient TreeMap<Long, Set<String>> newPartitions;
+    private transient Set<String> committablePartitions;
+
+    public StreamingFileWriter(
+            long bucketCheckInterval,
+            StreamingFileSink.BucketsBuilder<
+                    IN, String, ? extends StreamingFileSink.BucketsBuilder<IN, 
String, ?>>
+                    bucketsBuilder, String inlongMetric, String inlongAudit) {
+        super(bucketCheckInterval, bucketsBuilder, inlongMetric, inlongAudit);
+    }
+
+    @Override
+    public void initializeState(StateInitializationContext context) throws 
Exception {
+        currentNewPartitions = new HashSet<>();
+        newPartitions = new TreeMap<>();
+        committablePartitions = new HashSet<>();
+        super.initializeState(context);
+    }
+
+    @Override
+    protected void partitionCreated(String partition) {
+        currentNewPartitions.add(partition);
+    }
+
+    @Override
+    protected void partitionInactive(String partition) {
+        committablePartitions.add(partition);
+    }
+
+    @Override
+    protected void onPartFileOpened(String s, Path newPath) {
+    }
+
+    @Override
+    public void snapshotState(StateSnapshotContext context) throws Exception {
+        super.snapshotState(context);
+        newPartitions.put(context.getCheckpointId(), new 
HashSet<>(currentNewPartitions));
+        currentNewPartitions.clear();
+    }
+
+    @Override
+    protected void commitUpToCheckpoint(long checkpointId) throws Exception {
+        super.commitUpToCheckpoint(checkpointId);
+
+        NavigableMap<Long, Set<String>> headPartitions =
+                this.newPartitions.headMap(checkpointId, true);
+        Set<String> partitions = new HashSet<>(committablePartitions);
+        committablePartitions.clear();
+        headPartitions.values().forEach(partitions::addAll);
+        headPartitions.clear();
+
+        output.collect(
+                new StreamRecord<>(
+                        new PartitionCommitInfo(
+                                checkpointId,
+                                getRuntimeContext().getIndexOfThisSubtask(),
+                                
getRuntimeContext().getNumberOfParallelSubtasks(),
+                                new ArrayList<>(partitions))));
+    }
+}
diff --git 
a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/StreamingSink.java
 
b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/StreamingSink.java
new file mode 100644
index 000000000..6f7e8fb8a
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/StreamingSink.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.filesystem.stream;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.filesystem.FileSystemFactory;
+import org.apache.flink.table.filesystem.TableMetaStoreFactory;
+import org.apache.flink.table.filesystem.stream.PartitionCommitInfo;
+import org.apache.flink.table.filesystem.stream.PartitionCommitter;
+import org.apache.flink.table.filesystem.stream.compact.CompactBucketWriter;
+import org.apache.flink.table.filesystem.stream.compact.CompactCoordinator;
+import 
org.apache.flink.table.filesystem.stream.compact.CompactMessages.CoordinatorInput;
+import 
org.apache.flink.table.filesystem.stream.compact.CompactMessages.CoordinatorOutput;
+import org.apache.flink.table.filesystem.stream.compact.CompactOperator;
+import org.apache.flink.table.filesystem.stream.compact.CompactReader;
+import org.apache.flink.table.filesystem.stream.compact.CompactWriter;
+import org.apache.flink.util.function.SupplierWithException;
+import org.apache.inlong.sort.filesystem.stream.compact.CompactFileWriter;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+import static 
org.apache.flink.table.filesystem.FileSystemOptions.SINK_PARTITION_COMMIT_POLICY_KIND;
+
+/**
+ * Helper for creating streaming file sink.
+ */
+public class StreamingSink {
+
+    private StreamingSink() {
+    }
+
+    /**
+     * Create a file writer by input stream. This is similar to {@link 
StreamingFileSink}, in
+     * addition, it can emit {@link PartitionCommitInfo} to down stream.
+     */
+    public static <T> DataStream<PartitionCommitInfo> writer(
+            DataStream<T> inputStream,
+            long bucketCheckInterval,
+            StreamingFileSink.BucketsBuilder<
+                    T, String, ? extends StreamingFileSink.BucketsBuilder<T, 
String, ?>>
+                    bucketsBuilder,
+            int parallelism, String inlongMetric, String inlongAudit) {
+        StreamingFileWriter<T> fileWriter =
+                new StreamingFileWriter<>(bucketCheckInterval, bucketsBuilder, 
inlongMetric, inlongAudit);
+        return inputStream
+                .transform(
+                        StreamingFileWriter.class.getSimpleName(),
+                        TypeInformation.of(PartitionCommitInfo.class),
+                        fileWriter)
+                .setParallelism(parallelism);
+    }
+
+    /**
+     * Create a file writer with compaction operators by input stream. In 
addition, it can emit
+     * {@link PartitionCommitInfo} to down stream.
+     */
+    public static <T> DataStream<PartitionCommitInfo> compactionWriter(
+            DataStream<T> inputStream,
+            long bucketCheckInterval,
+            StreamingFileSink.BucketsBuilder<
+                    T, String, ? extends StreamingFileSink.BucketsBuilder<T, 
String, ?>>
+                    bucketsBuilder,
+            FileSystemFactory fsFactory,
+            Path path,
+            CompactReader.Factory<T> readFactory,
+            long targetFileSize,
+            int parallelism, String inlongMetric, String inlongAudit) {
+        CompactFileWriter<T> writer = new 
CompactFileWriter<>(bucketCheckInterval, bucketsBuilder, inlongMetric,
+                inlongAudit);
+
+        SupplierWithException<FileSystem, IOException> fsSupplier =
+                (SupplierWithException<FileSystem, IOException> & Serializable)
+                        () -> fsFactory.create(path.toUri());
+
+        CompactCoordinator coordinator = new CompactCoordinator(fsSupplier, 
targetFileSize);
+
+        SingleOutputStreamOperator<CoordinatorOutput> coordinatorOp =
+                inputStream
+                        .transform(
+                                "streaming-writer",
+                                TypeInformation.of(CoordinatorInput.class),
+                                writer)
+                        .setParallelism(parallelism)
+                        .transform(
+                                "compact-coordinator",
+                                TypeInformation.of(CoordinatorOutput.class),
+                                coordinator)
+                        .setParallelism(1)
+                        .setMaxParallelism(1);
+
+        CompactWriter.Factory<T> writerFactory =
+                CompactBucketWriter.factory(
+                        (SupplierWithException<BucketWriter<T, String>, 
IOException> & Serializable)
+                                bucketsBuilder::createBucketWriter);
+
+        CompactOperator<T> compacter =
+                new CompactOperator<>(fsSupplier, readFactory, writerFactory);
+
+        return coordinatorOp
+                .broadcast()
+                .transform(
+                        "compact-operator",
+                        TypeInformation.of(PartitionCommitInfo.class),
+                        compacter)
+                .setParallelism(parallelism);
+    }
+
+    /**
+     * Create a sink from file writer. Decide whether to add the node to 
commit partitions according
+     * to options.
+     */
+    public static DataStreamSink<?> sink(
+            DataStream<PartitionCommitInfo> writer,
+            Path locationPath,
+            ObjectIdentifier identifier,
+            List<String> partitionKeys,
+            TableMetaStoreFactory msFactory,
+            FileSystemFactory fsFactory,
+            Configuration options) {
+        DataStream<?> stream = writer;
+        if (partitionKeys.size() > 0 && 
options.contains(SINK_PARTITION_COMMIT_POLICY_KIND)) {
+            PartitionCommitter committer =
+                    new PartitionCommitter(
+                            locationPath, identifier, partitionKeys, 
msFactory, fsFactory, options);
+            stream =
+                    writer.transform(
+                                    PartitionCommitter.class.getSimpleName(), 
Types.VOID, committer)
+                            .setParallelism(1)
+                            .setMaxParallelism(1);
+        }
+
+        return stream.addSink(new 
DiscardingSink<>()).name("end").setParallelism(1);
+    }
+}
diff --git 
a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/compact/CompactFileWriter.java
 
b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/compact/CompactFileWriter.java
new file mode 100644
index 000000000..06018b9ef
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/compact/CompactFileWriter.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.filesystem.stream.compact;
+
+import org.apache.flink.core.fs.Path;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.table.filesystem.stream.compact.CompactMessages.CoordinatorInput;
+import 
org.apache.flink.table.filesystem.stream.compact.CompactMessages.EndCheckpoint;
+import 
org.apache.flink.table.filesystem.stream.compact.CompactMessages.InputFile;
+import org.apache.inlong.sort.filesystem.stream.AbstractStreamingWriter;
+
+/**
+ * Writer for emitting {@link InputFile} and {@link EndCheckpoint} to 
downstream.
+ */
+public class CompactFileWriter<T>
+        extends AbstractStreamingWriter<T, CoordinatorInput> {
+
+    private static final long serialVersionUID = 1L;
+
+    public CompactFileWriter(
+            long bucketCheckInterval,
+            StreamingFileSink.BucketsBuilder<
+                    T, String, ? extends StreamingFileSink.BucketsBuilder<T, 
String, ?>>
+                    bucketsBuilder, String inlongMetric, String inlongAudit) {
+        super(bucketCheckInterval, bucketsBuilder, inlongMetric, inlongAudit);
+    }
+
+    @Override
+    protected void partitionCreated(String partition) {
+    }
+
+    @Override
+    protected void partitionInactive(String partition) {
+    }
+
+    @Override
+    protected void onPartFileOpened(String partition, Path newPath) {
+        output.collect(new StreamRecord<>(new InputFile(partition, newPath)));
+    }
+
+    @Override
+    protected void commitUpToCheckpoint(long checkpointId) throws Exception {
+        super.commitUpToCheckpoint(checkpointId);
+        output.collect(
+                new StreamRecord<>(
+                        new EndCheckpoint(
+                                checkpointId,
+                                getRuntimeContext().getIndexOfThisSubtask(),
+                                
getRuntimeContext().getNumberOfParallelSubtasks())));
+    }
+}
diff --git 
a/inlong-sort/sort-connectors/filesystem/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
 
b/inlong-sort/sort-connectors/filesystem/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 000000000..142b40bfb
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/filesystem/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,19 @@
+#
+#   Licensed to the Apache Software Foundation (ASF) under one
+#   or more contributor license agreements.  See the NOTICE file
+#   distributed with this work for additional information
+#   regarding copyright ownership.  The ASF licenses this file
+#   to you under the Apache License, Version 2.0 (the
+#   "License"); you may not use this file except in compliance
+#   with the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+
+org.apache.inlong.sort.filesystem.FileSystemTableFactory
\ No newline at end of file
diff --git a/inlong-sort/sort-connectors/pom.xml 
b/inlong-sort/sort-connectors/pom.xml
index 14eb9f268..2b785a462 100644
--- a/inlong-sort/sort-connectors/pom.xml
+++ b/inlong-sort/sort-connectors/pom.xml
@@ -54,6 +54,7 @@
         <module>elasticsearch-7</module>
         <module>redis</module>
         <module>tubemq</module>
+        <module>filesystem</module>
     </modules>
 
     <dependencies>
diff --git a/inlong-sort/sort-core/pom.xml b/inlong-sort/sort-core/pom.xml
index 925296280..94282be0c 100644
--- a/inlong-sort/sort-core/pom.xml
+++ b/inlong-sort/sort-core/pom.xml
@@ -211,6 +211,12 @@
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-connector-filesystem</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
 </project>
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FilesystemSqlParserTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FilesystemSqlParserTest.java
new file mode 100644
index 000000000..178b8370f
--- /dev/null
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FilesystemSqlParserTest.java
@@ -0,0 +1,129 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.inlong.sort.parser;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.inlong.sort.formats.common.IntFormatInfo;
+import org.apache.inlong.sort.formats.common.LongFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.parser.impl.FlinkSqlParser;
+import org.apache.inlong.sort.parser.result.ParseResult;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.GroupInfo;
+import org.apache.inlong.sort.protocol.StreamInfo;
+import org.apache.inlong.sort.protocol.node.Node;
+import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
+import org.apache.inlong.sort.protocol.node.load.FileSystemLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Test for {@link FileSystemLoadNode}
+ */
+public class FilesystemSqlParserTest {
+
+    /**
+     * build mysql extract node
+     *
+     * @return Mysql extract node
+     */
+    private MySqlExtractNode buildMySQLExtractNode() {
+        List<FieldInfo> fields = Arrays.asList(new FieldInfo("age", new 
LongFormatInfo()),
+                new FieldInfo("name", new StringFormatInfo()));
+        Map<String, String> map = new HashMap<>();
+        return new MySqlExtractNode("1", "mysql_input", fields,
+                null, map, null,
+                Collections.singletonList("user"), "localhost", "root", 
"inlong",
+                "test", null, null,
+                false, null);
+    }
+
+    /**
+     * build filesystem load node
+     *
+     * @return filesystem load node
+     */
+    private FileSystemLoadNode buildFileSystemLoadNode() {
+        List<FieldInfo> fields = Arrays.asList(
+                new FieldInfo("name", new StringFormatInfo()),
+                new FieldInfo("age", new LongFormatInfo()));
+        List<FieldRelation> relations = Arrays
+                .asList(new FieldRelation(new FieldInfo("name", new 
StringFormatInfo()),
+                                new FieldInfo("name", new StringFormatInfo())),
+                        new FieldRelation(new FieldInfo("age", new 
IntFormatInfo()),
+                                new FieldInfo("age", new LongFormatInfo()))
+                );
+        Map<String, String> map = new HashMap<>();
+        map.put("sink.ignore.changelog", "true");
+        return new FileSystemLoadNode("2", "filesystem_node", fields, 
relations, null, "hdfs://127.0.0.1:8020/", "csv",
+                1, map, null, null);
+    }
+
+    /**
+     * build node relation
+     *
+     * @param inputs extract node
+     * @param outputs load node
+     * @return node relation
+     */
+    private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> 
outputs) {
+        List<String> inputIds = 
inputs.stream().map(Node::getId).collect(Collectors.toList());
+        List<String> outputIds = 
outputs.stream().map(Node::getId).collect(Collectors.toList());
+        return new NodeRelation(inputIds, outputIds);
+    }
+
+    /**
+     * Test flink sql task for extract is mysql {@link MySqlExtractNode} and 
load is hbase {@link FileSystemLoadNode}
+     *
+     * @throws Exception The exception may be thrown when executing
+     */
+    @Test
+    public void testFlinkSqlParse() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.enableCheckpointing(10000);
+        env.disableOperatorChaining();
+        EnvironmentSettings settings = EnvironmentSettings
+                .newInstance()
+                .useBlinkPlanner()
+                .inStreamingMode()
+                .build();
+        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
settings);
+        Node inputNode = buildMySQLExtractNode();
+        Node outputNode = buildFileSystemLoadNode();
+        StreamInfo streamInfo = new StreamInfo("1", Arrays.asList(inputNode, 
outputNode),
+                
Collections.singletonList(buildNodeRelation(Collections.singletonList(inputNode),
+                        Collections.singletonList(outputNode))));
+        GroupInfo groupInfo = new GroupInfo("1", 
Collections.singletonList(streamInfo));
+        FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, 
groupInfo);
+        ParseResult result = parser.parse();
+        Assert.assertTrue(result.tryExecute());
+    }
+}
diff --git a/licenses/inlong-sort-connectors/LICENSE 
b/licenses/inlong-sort-connectors/LICENSE
index 1363253d1..38eed8f85 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -528,6 +528,16 @@
   Source  : org.apache.bahir:flink-connector-redis_2.11:1.1-SNAPSHOT (Please 
note that the software have been modified.)
   License : https://github.com/apache/bahir-flink/blob/master/LICENSE
 
+ 1.3.10 
inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/FileSystemTableSink.java
+        
inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/FileSystemTableFactory.java
+        
inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/AbstractFileSystemTable.java
+        
inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/StreamingSink.java
+        
inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/StreamingFileWriter.java
+        
inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java
+        
inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/compact/CompactFileWriter.java
+  Source  : flink-table-runtime-blink_2.11-13.2-rc2 2.2.1 (Please note that 
the software have been modified.)
+  License : https://github.com/apache/flink/blob/release-1.13.2-rc2/LICENSE
+
 =======================================================================
 Apache InLong Subcomponents:
 
diff --git a/licenses/inlong-sort-connectors/NOTICE 
b/licenses/inlong-sort-connectors/NOTICE
index 75d6375ee..410c311bb 100644
--- a/licenses/inlong-sort-connectors/NOTICE
+++ b/licenses/inlong-sort-connectors/NOTICE
@@ -2559,4 +2559,19 @@ Apache Parquet Hadoop NOTICE
 
 The NOTICE content is too long  and is included at notices/NOTICE-[project].txt
 
+// ------------------------------------------------------------------
+// NOTICE file corresponding to the section 4d of The Apache License,
+// Version 2.0, in this case for Apache Flink
+// ------------------------------------------------------------------
+
+Apache Flink
+Copyright 2006-2021 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+
+Flink : Table : Runtime Blink
+Copyright 2014-2021 The Apache Software Foundation
+
 

Reply via email to