This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new e07e9a7cc [Connector-V2] [Clickhouse] Improve Clickhouse File 
Connector (#3416)
e07e9a7cc is described below

commit e07e9a7cc29c8a77f5600634204efd4ec3884f19
Author: Hisoka <[email protected]>
AuthorDate: Tue Dec 13 10:24:54 2022 +0800

    [Connector-V2] [Clickhouse] Improve Clickhouse File Connector (#3416)
    
    * [Clickhouse] Change Random number to Context index
    
    * Update docs/en/connector-v2/sink/ClickhouseFile.md
---
 docs/en/connector-v2/sink/ClickhouseFile.md        |  55 ++++---
 .../clickhouse/config/ClickhouseConfig.java        |  16 +++
 .../clickhouse/config/FileReaderOption.java        |  86 ++---------
 .../clickhouse/sink/file/ClickhouseFileSink.java   |  42 +++++-
 .../sink/file/ClickhouseFileSinkAggCommitter.java  |  94 ++++++++++++
 .../sink/file/ClickhouseFileSinkFactory.java       |   6 +-
 .../sink/file/ClickhouseFileSinkWriter.java        | 159 +++++++++++++--------
 .../clickhouse/state/CKFileAggCommitInfo.java}     |  24 ++--
 .../clickhouse/state/CKFileCommitInfo.java}        |  24 ++--
 .../clickhouse/ClickhouseFactoryTest.java          |   2 +
 10 files changed, 334 insertions(+), 174 deletions(-)

diff --git a/docs/en/connector-v2/sink/ClickhouseFile.md 
b/docs/en/connector-v2/sink/ClickhouseFile.md
index 86f762a9c..1eb2458d0 100644
--- a/docs/en/connector-v2/sink/ClickhouseFile.md
+++ b/docs/en/connector-v2/sink/ClickhouseFile.md
@@ -21,22 +21,25 @@ Write data to Clickhouse can also be done using JDBC
 
 ## Options
 
-| name                   | type    | required | default value |
-| ---------------------- | ------- | -------- | ------------- |
-| host                   | string  | yes      | -             |
-| database               | string  | yes      | -             |
-| table                  | string  | yes      | -             |
-| username               | string  | yes      | -             |
-| password               | string  | yes      | -             |
-| clickhouse_local_path  | string  | yes      | -             |
-| sharding_key           | string  | no       | -             |
-| copy_method            | string  | no       | scp           |
-| node_free_password     | boolean | no       | false         |
-| node_pass              | list    | no       | -             |
-| node_pass.node_address | string  | no       | -             |
-| node_pass.username     | string  | no       | "root"        |
-| node_pass.password     | string  | no       | -             |
-| common-options         |         | no       | -             |
+| name                   | type    | required | default value                  
        |
+|------------------------|---------|----------|----------------------------------------|
+| host                   | string  | yes      | -                              
        |
+| database               | string  | yes      | -                              
        |
+| table                  | string  | yes      | -                              
        |
+| username               | string  | yes      | -                              
        |
+| password               | string  | yes      | -                              
        |
+| clickhouse_local_path  | string  | yes      | -                              
        |
+| sharding_key           | string  | no       | -                              
        |
+| copy_method            | string  | no       | scp                            
        |
+| node_free_password     | boolean | no       | false                          
        |
+| node_pass              | list    | no       | -                              
        |
+| node_pass.node_address | string  | no       | -                              
        |
+| node_pass.username     | string  | no       | "root"                         
        |
+| node_pass.password     | string  | no       | -                              
        |
+| compatible_mode        | boolean | no       | false                          
        |
+| file_fields_delimiter  | string  | no       | "\t"                           
        |
+| file_temp_path         | string  | no       | 
"/tmp/seatunnel/clickhouse-local/file" |
+| common-options         |         | no       | -                              
        |
 
 ### host [string]
 
@@ -94,6 +97,21 @@ The username corresponding to the clickhouse server, default 
root user.
 
 The password corresponding to the clickhouse server.
 
+### compatible_mode [boolean]
+
+In the lower version of Clickhouse, the ClickhouseLocal program does not 
support the `--path` parameter,
+you need to use this mode to take other ways to realize the `--path` parameter 
function
+
+### file_fields_delimiter [string]
+
+ClickhouseFile uses csv format to temporarily save data. If the data in the 
row contains the delimiter value
+of csv, it may cause program exceptions. 
+Avoid this with this configuration. Value string has to be an exactly one 
character long
+
+### file_temp_path [string]
+
+The directory where ClickhouseFile stores temporary files locally.
+
 ### common options
 
 Sink plugin common parameters, please refer to [Sink Common 
Options](common-options.md) for details
@@ -122,3 +140,8 @@ Sink plugin common parameters, please refer to [Sink Common 
Options](common-opti
 ### 2.2.0-beta 2022-09-26
 
 - Support write data to ClickHouse File and move to ClickHouse data dir
+
+### Next version
+
+- [BugFix] Fix generated data part name conflict and improve file commit logic 
[3416](https://github.com/apache/incubator-seatunnel/pull/3416)
+- [Feature] Support compatible_mode compatible with lower version Clickhouse  
[3416](https://github.com/apache/incubator-seatunnel/pull/3416)
\ No newline at end of file
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java
index 62f7bd398..3f48df990 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java
@@ -96,7 +96,16 @@ public class ClickhouseConfig {
     public static final Option<ClickhouseFileCopyMethod> COPY_METHOD = 
Options.key("copy_method").enumType(ClickhouseFileCopyMethod.class)
         .defaultValue(ClickhouseFileCopyMethod.SCP).withDescription("The 
method of copy Clickhouse file");
 
+    public static final Option<Boolean> COMPATIBLE_MODE = 
Options.key("compatible_mode").booleanType()
+        .defaultValue(false).withDescription("In the lower version of 
Clickhouse, the ClickhouseLocal program does not support the `--path` 
parameter, " +
+            "you need to use this mode to take other ways to realize the 
--path parameter function");
+
     public static final String NODE_ADDRESS = "node_address";
+
+    public static final Option<Boolean> NODE_FREE_PASSWORD = 
Options.key("node_free_password").booleanType()
+        .defaultValue(false).withDescription("Because seatunnel need to use 
scp or rsync for file transfer, " +
+            "seatunnel need clickhouse server-side access. If each spark node 
and clickhouse server are configured with password-free login, " +
+            "you can configure this option to true, otherwise you need to 
configure the corresponding node password in the node_pass configuration");
     /**
      * The password of Clickhouse server node
      */
@@ -106,4 +115,11 @@ public class ClickhouseConfig {
     public static final Option<Map<String, String>> CLICKHOUSE_PREFIX = 
Options.key("clickhouse").mapType()
         .defaultValue(Collections.emptyMap()).withDescription("Clickhouse 
custom config");
 
+    public static final Option<String> FILE_FIELDS_DELIMITER = 
Options.key("file_fields_delimiter").stringType()
+        .defaultValue("\t").withDescription("ClickhouseFile uses csv format to 
temporarily save data. If the data in the row contains the delimiter value of 
csv," +
+            " it may cause program exceptions. Avoid this with this 
configuration. Value string has to be an exactly one character long");
+
+    public static final Option<String> FILE_TEMP_PATH = 
Options.key("file_temp_path").stringType()
+        
.defaultValue("/tmp/seatunnel/clickhouse-local/file").withDescription("The 
directory where ClickhouseFile stores temporary files locally.");
+
 }
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/FileReaderOption.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/FileReaderOption.java
index 283c1c4f9..fb00ee995 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/FileReaderOption.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/FileReaderOption.java
@@ -20,10 +20,13 @@ package 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.ShardMetadata;
 
+import lombok.Data;
+
 import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
 
+@Data
 public class FileReaderOption implements Serializable {
 
     private ShardMetadata shardMetadata;
@@ -35,90 +38,29 @@ public class FileReaderOption implements Serializable {
     private Map<String, String> nodeUser;
     private Map<String, String> nodePassword;
     private SeaTunnelRowType seaTunnelRowType;
+    private boolean compatibleMode;
+    private String fileTempPath;
+    private String fileFieldsDelimiter;
 
     public FileReaderOption(ShardMetadata shardMetadata, Map<String, String> 
tableSchema,
                             List<String> fields, String clickhouseLocalPath,
                             ClickhouseFileCopyMethod copyMethod,
                             Map<String, String> nodeUser,
-                            Map<String, String> nodePassword) {
+                            boolean nodeFreePass,
+                            Map<String, String> nodePassword,
+                            boolean compatibleMode,
+                            String fileTempPath,
+                            String fileFieldsDelimiter) {
         this.shardMetadata = shardMetadata;
         this.tableSchema = tableSchema;
         this.fields = fields;
         this.clickhouseLocalPath = clickhouseLocalPath;
         this.copyMethod = copyMethod;
         this.nodeUser = nodeUser;
-        this.nodePassword = nodePassword;
-    }
-
-    public SeaTunnelRowType getSeaTunnelRowType() {
-        return seaTunnelRowType;
-    }
-
-    public void setSeaTunnelRowType(SeaTunnelRowType seaTunnelRowType) {
-        this.seaTunnelRowType = seaTunnelRowType;
-    }
-
-    public boolean isNodeFreePass() {
-        return nodeFreePass;
-    }
-
-    public void setNodeFreePass(boolean nodeFreePass) {
         this.nodeFreePass = nodeFreePass;
-    }
-
-    public String getClickhouseLocalPath() {
-        return clickhouseLocalPath;
-    }
-
-    public void setClickhouseLocalPath(String clickhouseLocalPath) {
-        this.clickhouseLocalPath = clickhouseLocalPath;
-    }
-
-    public ClickhouseFileCopyMethod getCopyMethod() {
-        return copyMethod;
-    }
-
-    public void setCopyMethod(ClickhouseFileCopyMethod copyMethod) {
-        this.copyMethod = copyMethod;
-    }
-
-    public Map<String, String> getNodeUser() {
-        return nodeUser;
-    }
-
-    public void setNodeUser(Map<String, String> nodeUser) {
-        this.nodeUser = nodeUser;
-    }
-
-    public Map<String, String> getNodePassword() {
-        return nodePassword;
-    }
-
-    public void setNodePassword(Map<String, String> nodePassword) {
         this.nodePassword = nodePassword;
-    }
-
-    public ShardMetadata getShardMetadata() {
-        return shardMetadata;
-    }
-
-    public void setShardMetadata(ShardMetadata shardMetadata) {
-        this.shardMetadata = shardMetadata;
-    }
-
-    public Map<String, String> getTableSchema() {
-        return tableSchema;
-    }
-
-    public void setTableSchema(Map<String, String> tableSchema) {
-        this.tableSchema = tableSchema;
-    }
-
-    public List<String> getFields() {
-        return fields;
-    }
-
-    public void setFields(List<String> fields) {
-        this.fields = fields;
+        this.compatibleMode = compatibleMode;
+        this.fileFieldsDelimiter = fileFieldsDelimiter;
+        this.fileTempPath = fileTempPath;
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
index 973478a8b..7aa5010f2 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
@@ -18,11 +18,15 @@
 package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.CLICKHOUSE_LOCAL_PATH;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.COMPATIBLE_MODE;
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.COPY_METHOD;
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE;
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FIELDS;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FILE_FIELDS_DELIMITER;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FILE_TEMP_PATH;
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST;
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.NODE_ADDRESS;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.NODE_FREE_PASSWORD;
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.NODE_PASS;
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PASSWORD;
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SHARDING_KEY;
@@ -31,7 +35,10 @@ import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Clickh
 
 import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.serialization.DefaultSerializer;
+import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -46,8 +53,8 @@ import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.Clickhouse
 import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.ShardMetadata;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseProxy;
-import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKAggCommitInfo;
-import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKCommitInfo;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKFileAggCommitInfo;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKFileCommitInfo;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSinkState;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseUtil;
 
@@ -62,10 +69,11 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.stream.Collectors;
 
 @AutoService(SeaTunnelSink.class)
-public class ClickhouseFileSink implements SeaTunnelSink<SeaTunnelRow, 
ClickhouseSinkState, CKCommitInfo, CKAggCommitInfo> {
+public class ClickhouseFileSink implements SeaTunnelSink<SeaTunnelRow, 
ClickhouseSinkState, CKFileCommitInfo, CKFileAggCommitInfo> {
 
     private FileReaderOption readerOption;
 
@@ -85,6 +93,10 @@ public class ClickhouseFileSink implements 
SeaTunnelSink<SeaTunnelRow, Clickhous
         }
         Map<String, Object> defaultConfigs = ImmutableMap.<String, 
Object>builder()
             .put(COPY_METHOD.key(), COPY_METHOD.defaultValue().getName())
+            .put(NODE_FREE_PASSWORD.key(), NODE_FREE_PASSWORD.defaultValue())
+            .put(COMPATIBLE_MODE.key(), COMPATIBLE_MODE.defaultValue())
+            .put(FILE_TEMP_PATH.key(), FILE_TEMP_PATH.defaultValue())
+            .put(FILE_FIELDS_DELIMITER.key(), 
FILE_FIELDS_DELIMITER.defaultValue())
             .build();
 
         config = config.withFallback(ConfigFactory.parseMap(defaultConfigs));
@@ -126,8 +138,13 @@ public class ClickhouseFileSink implements 
SeaTunnelSink<SeaTunnelRow, Clickhous
                 configObject -> 
configObject.toConfig().getString(PASSWORD.key())));
 
         proxy.close();
+
+        if (config.getString(FILE_FIELDS_DELIMITER.key()).length() != 1) {
+            throw new 
ClickhouseConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, 
FILE_FIELDS_DELIMITER.key() + " must be a single character");
+        }
         this.readerOption = new FileReaderOption(shardMetadata, tableSchema, 
fields, config.getString(CLICKHOUSE_LOCAL_PATH.key()),
-            
ClickhouseFileCopyMethod.from(config.getString(COPY_METHOD.key())), nodeUser, 
nodePassword);
+            
ClickhouseFileCopyMethod.from(config.getString(COPY_METHOD.key())), nodeUser, 
config.getBoolean(NODE_FREE_PASSWORD.key()), nodePassword,
+            config.getBoolean(COMPATIBLE_MODE.key()), 
config.getString(FILE_TEMP_PATH.key()), 
config.getString(FILE_FIELDS_DELIMITER.key()));
     }
 
     @Override
@@ -141,7 +158,22 @@ public class ClickhouseFileSink implements 
SeaTunnelSink<SeaTunnelRow, Clickhous
     }
 
     @Override
-    public SinkWriter<SeaTunnelRow, CKCommitInfo, ClickhouseSinkState> 
createWriter(SinkWriter.Context context) throws IOException {
+    public SinkWriter<SeaTunnelRow, CKFileCommitInfo, ClickhouseSinkState> 
createWriter(SinkWriter.Context context) throws IOException {
         return new ClickhouseFileSinkWriter(readerOption, context);
     }
+
+    @Override
+    public Optional<Serializer<CKFileCommitInfo>> getCommitInfoSerializer() {
+        return Optional.of(new DefaultSerializer<>());
+    }
+
+    @Override
+    public Optional<SinkAggregatedCommitter<CKFileCommitInfo, 
CKFileAggCommitInfo>> createAggregatedCommitter() throws IOException {
+        return Optional.of(new 
ClickhouseFileSinkAggCommitter(this.readerOption));
+    }
+
+    @Override
+    public Optional<Serializer<CKFileAggCommitInfo>> 
getAggregatedCommitInfoSerializer() {
+        return Optional.of(new DefaultSerializer<>());
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkAggCommitter.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkAggCommitter.java
new file mode 100644
index 000000000..80ce83aa5
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkAggCommitter.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file;
+
+import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.FileReaderOption;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseProxy;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKFileAggCommitInfo;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKFileCommitInfo;
+
+import com.clickhouse.client.ClickHouseException;
+import com.clickhouse.client.ClickHouseRequest;
+import com.clickhouse.client.ClickHouseResponse;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ClickhouseFileSinkAggCommitter implements 
SinkAggregatedCommitter<CKFileCommitInfo, CKFileAggCommitInfo> {
+
+    private final ClickhouseProxy proxy;
+    private final ClickhouseTable clickhouseTable;
+
+    public ClickhouseFileSinkAggCommitter(FileReaderOption readerOption) {
+        proxy = new 
ClickhouseProxy(readerOption.getShardMetadata().getDefaultShard().getNode());
+        clickhouseTable = 
proxy.getClickhouseTable(readerOption.getShardMetadata().getDatabase(),
+            readerOption.getShardMetadata().getTable());
+    }
+
+    @Override
+    public List<CKFileAggCommitInfo> commit(List<CKFileAggCommitInfo> 
aggregatedCommitInfo) throws IOException {
+        aggregatedCommitInfo.forEach(commitInfo -> 
commitInfo.getDetachedFiles().forEach((shard, files) -> {
+            try {
+                this.attachFileToClickhouse(shard, files);
+            } catch (ClickHouseException e) {
+                throw new SeaTunnelException("failed commit file to 
clickhouse", e);
+            }
+        }));
+        return new ArrayList<>();
+    }
+
+    @Override
+    public CKFileAggCommitInfo combine(List<CKFileCommitInfo> commitInfos) {
+        Map<Shard, List<String>> files = new HashMap<>();
+        commitInfos.forEach(infos -> infos.getDetachedFiles().forEach((shard, 
file) -> {
+            if (files.containsKey(shard)) {
+                files.get(shard).addAll(file);
+            } else {
+                files.put(shard, file);
+            }
+        }));
+        return new CKFileAggCommitInfo(files);
+    }
+
+    @Override
+    public void abort(List<CKFileAggCommitInfo> aggregatedCommitInfo) throws 
Exception {
+
+    }
+
+    @Override
+    public void close() throws IOException {
+        proxy.close();
+    }
+
+    private void attachFileToClickhouse(Shard shard, List<String> 
clickhouseLocalFiles) throws ClickHouseException {
+        ClickHouseRequest<?> request = proxy.getClickhouseConnection(shard);
+        for (String clickhouseLocalFile : clickhouseLocalFiles) {
+            ClickHouseResponse response = request.query(String.format("ALTER 
TABLE %s ATTACH PART '%s'",
+                clickhouseTable.getLocalTableName(),
+                
clickhouseLocalFile.substring(clickhouseLocalFile.lastIndexOf("/") + 
1))).executeAndWait();
+            response.close();
+        }
+    }
+
+}
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkFactory.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkFactory.java
index 2d9ccf220..829d8e005 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkFactory.java
@@ -18,10 +18,14 @@
 package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.CLICKHOUSE_LOCAL_PATH;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.COMPATIBLE_MODE;
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.COPY_METHOD;
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE;
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FIELDS;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FILE_FIELDS_DELIMITER;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FILE_TEMP_PATH;
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.NODE_FREE_PASSWORD;
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.NODE_PASS;
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PASSWORD;
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SHARDING_KEY;
@@ -44,6 +48,6 @@ public class ClickhouseFileSinkFactory implements 
TableSinkFactory {
     @Override
     public OptionRule optionRule() {
         return OptionRule.builder().required(HOST, TABLE, DATABASE, USERNAME, 
PASSWORD, CLICKHOUSE_LOCAL_PATH)
-            .optional(COPY_METHOD, SHARDING_KEY, FIELDS, NODE_PASS).build();
+            .optional(COPY_METHOD, SHARDING_KEY, FIELDS, NODE_FREE_PASSWORD, 
NODE_PASS, COMPATIBLE_MODE, FILE_FIELDS_DELIMITER, FILE_TEMP_PATH).build();
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java
index 6f24e2c79..14b5c181c 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java
@@ -27,17 +27,15 @@ import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.Clickhouse
 import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseProxy;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ShardRouter;
-import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKCommitInfo;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKFileCommitInfo;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSinkState;
 
-import com.clickhouse.client.ClickHouseException;
-import com.clickhouse.client.ClickHouseRequest;
-import com.clickhouse.client.ClickHouseResponse;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.io.FileUtils;
 
 import java.io.BufferedReader;
 import java.io.File;
+import java.io.FileWriter;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
@@ -48,7 +46,6 @@ import java.nio.file.Paths;
 import java.nio.file.StandardOpenOption;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -58,24 +55,32 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 
 @Slf4j
-public class ClickhouseFileSinkWriter implements SinkWriter<SeaTunnelRow, 
CKCommitInfo, ClickhouseSinkState> {
-    private static final String CLICKHOUSE_LOCAL_FILE_PREFIX = 
"/tmp/clickhouse-local/seatunnel-file";
+public class ClickhouseFileSinkWriter implements SinkWriter<SeaTunnelRow, 
CKFileCommitInfo, ClickhouseSinkState> {
+
+    private static final String CK_LOCAL_CONFIG_TEMPLATE = "<yandex><path> %s 
</path> <users><default><password/> <profile>default</profile> 
<quota>default</quota>" +
+        
"<access_management>1</access_management></default></users><profiles><default/></profiles><quotas><default/></quotas></yandex>";
+    private static final String CLICKHOUSE_LOCAL_FILE_SUFFIX = 
"/local_data.log";
     private static final int UUID_LENGTH = 10;
     private final FileReaderOption readerOption;
     private final ShardRouter shardRouter;
     private final ClickhouseProxy proxy;
     private final ClickhouseTable clickhouseTable;
     private final Map<Shard, List<String>> shardLocalDataPaths;
-    private final Map<Shard, List<SeaTunnelRow>> rowCache;
+    private final Map<Shard, FileChannel> rowCache;
+
+    private final Map<Shard, String> shardTempFile;
+
+    private final SinkWriter.Context context;
 
     public ClickhouseFileSinkWriter(FileReaderOption readerOption, 
SinkWriter.Context context) {
         this.readerOption = readerOption;
+        this.context = context;
         proxy = new 
ClickhouseProxy(this.readerOption.getShardMetadata().getDefaultShard().getNode());
         shardRouter = new ShardRouter(proxy, 
this.readerOption.getShardMetadata());
         clickhouseTable = 
proxy.getClickhouseTable(this.readerOption.getShardMetadata().getDatabase(),
             this.readerOption.getShardMetadata().getTable());
         rowCache = new HashMap<>(Common.COLLECTION_SIZE);
-
+        shardTempFile = new HashMap<>();
         nodePasswordCheck();
 
         // find file local save path of each node
@@ -90,7 +95,20 @@ public class ClickhouseFileSinkWriter implements 
SinkWriter<SeaTunnelRow, CKComm
     @Override
     public void write(SeaTunnelRow element) throws IOException {
         Shard shard = shardRouter.getShard(element);
-        rowCache.computeIfAbsent(shard, k -> new ArrayList<>()).add(element);
+        FileChannel channel = rowCache.computeIfAbsent(shard, k -> {
+            try {
+                String uuid = UUID.randomUUID().toString().substring(0, 
UUID_LENGTH).replaceAll("-", "_");
+                String clickhouseLocalFile = String.format("%s/%s", 
readerOption.getFileTempPath(), uuid);
+                FileUtils.forceMkdir(new File(clickhouseLocalFile));
+                String clickhouseLocalFileTmpFile = clickhouseLocalFile + 
CLICKHOUSE_LOCAL_FILE_SUFFIX;
+                shardTempFile.put(shard, clickhouseLocalFileTmpFile);
+                return FileChannel.open(Paths.get(clickhouseLocalFileTmpFile), 
StandardOpenOption.WRITE,
+                    StandardOpenOption.READ, StandardOpenOption.CREATE_NEW);
+            } catch (IOException e) {
+                throw new 
ClickhouseConnectorException(CommonErrorCode.FILE_OPERATION_FAILED, "can't 
create new file to save tmp data", e);
+            }
+        });
+        saveDataToFile(channel, element);
     }
 
     private void nodePasswordCheck() {
@@ -105,62 +123,67 @@ public class ClickhouseFileSinkWriter implements 
SinkWriter<SeaTunnelRow, CKComm
     }
 
     @Override
-    public Optional<CKCommitInfo> prepareCommit() throws IOException {
-        return Optional.empty();
+    public Optional<CKFileCommitInfo> prepareCommit() throws IOException {
+        for (FileChannel channel : rowCache.values()) {
+            channel.close();
+        }
+        Map<Shard, List<String>> detachedFiles = new HashMap<>();
+        shardTempFile.forEach((shard, path) -> {
+            List<String> clickhouseLocalFiles = null;
+            try {
+                clickhouseLocalFiles = generateClickhouseLocalFiles(path);
+                // move file to server
+                moveClickhouseLocalFileToServer(shard, clickhouseLocalFiles);
+                detachedFiles.put(shard, clickhouseLocalFiles);
+            } catch (Exception e) {
+                throw new 
ClickhouseConnectorException(CommonErrorCode.FLUSH_DATA_FAILED, "Flush data 
into clickhouse file error", e);
+            } finally {
+                if (clickhouseLocalFiles != null && 
!clickhouseLocalFiles.isEmpty()) {
+                    // clear local file
+                    clearLocalFileDirectory(clickhouseLocalFiles);
+                }
+            }
+        });
+        rowCache.clear();
+        shardTempFile.clear();
+        return Optional.of(new CKFileCommitInfo(detachedFiles));
     }
 
     @Override
     public void abortPrepare() {
-
     }
 
     @Override
     public void close() throws IOException {
-        rowCache.forEach(this::flush);
+        for (FileChannel channel : rowCache.values()) {
+            channel.close();
+        }
     }
 
-    private void flush(Shard shard, List<SeaTunnelRow> rows) {
-        try {
-            // generate clickhouse local file
-            // TODO generate file by sub rows to save memory
-            List<String> clickhouseLocalFiles = 
generateClickhouseLocalFiles(rows);
-            // move file to server
-            attachClickhouseLocalFileToServer(shard, clickhouseLocalFiles);
-            // clear local file
-            clearLocalFileDirectory(clickhouseLocalFiles);
-        } catch (Exception e) {
-            throw new 
ClickhouseConnectorException(CommonErrorCode.FLUSH_DATA_FAILED, "Flush data 
into clickhouse file error", e);
-        }
+    private void saveDataToFile(FileChannel fileChannel, SeaTunnelRow row) 
throws IOException {
+        String data = this.readerOption.getFields().stream().map(field -> 
row.getField(this.readerOption.getSeaTunnelRowType().indexOf(field)).toString())
+            
.collect(Collectors.joining(readerOption.getFileFieldsDelimiter())) + "\n";
+        MappedByteBuffer buffer = 
fileChannel.map(FileChannel.MapMode.READ_WRITE, fileChannel.size(),
+            data.getBytes(StandardCharsets.UTF_8).length);
+        buffer.put(data.getBytes(StandardCharsets.UTF_8));
     }
 
-    private List<String> generateClickhouseLocalFiles(List<SeaTunnelRow> rows) 
throws IOException,
+    private List<String> generateClickhouseLocalFiles(String 
clickhouseLocalFileTmpFile) throws IOException,
         InterruptedException {
-        if (rows.isEmpty()) {
-            return Collections.emptyList();
-        }
-        String uuid = UUID.randomUUID().toString().substring(0, 
UUID_LENGTH).replaceAll("-", "_");
-        String clickhouseLocalFile = String.format("%s/%s", 
CLICKHOUSE_LOCAL_FILE_PREFIX, uuid);
-        FileUtils.forceMkdir(new File(clickhouseLocalFile));
-        String clickhouseLocalFileTmpFile = clickhouseLocalFile + 
"/local_data.log";
-        try (FileChannel fileChannel = 
FileChannel.open(Paths.get(clickhouseLocalFileTmpFile), 
StandardOpenOption.WRITE,
-            StandardOpenOption.READ, StandardOpenOption.CREATE_NEW)) {
-            String data = rows.stream()
-                .map(row -> this.readerOption.getFields().stream().map(field 
-> 
row.getField(this.readerOption.getSeaTunnelRowType().indexOf(field)).toString())
-                    .collect(Collectors.joining("\t")))
-                .collect(Collectors.joining("\n"));
-            MappedByteBuffer buffer = 
fileChannel.map(FileChannel.MapMode.READ_WRITE, fileChannel.size(),
-                data.getBytes(StandardCharsets.UTF_8).length);
-            buffer.put(data.getBytes(StandardCharsets.UTF_8));
-        }
-
+        // temp file path format prefix/<uuid>/suffix
+        String[] tmpStrArr = clickhouseLocalFileTmpFile.split("/");
+        String uuid = tmpStrArr[tmpStrArr.length - 2];
         List<String> localPaths = 
Arrays.stream(this.readerOption.getClickhouseLocalPath().trim().split(" "))
             .collect(Collectors.toList());
+        String clickhouseLocalFile = clickhouseLocalFileTmpFile.substring(0, 
clickhouseLocalFileTmpFile.length() - CLICKHOUSE_LOCAL_FILE_SUFFIX.length());
         List<String> command = new ArrayList<>(localPaths);
         if (localPaths.size() == 1) {
             command.add("local");
         }
         command.add("--file");
         command.add(clickhouseLocalFileTmpFile);
+        command.add("--format_csv_delimiter");
+        command.add("\"" + readerOption.getFileFieldsDelimiter() + "\"");
         command.add("-S");
         command.add("\"" + this.readerOption.getFields().stream().map(field -> 
field + " " + 
readerOption.getTableSchema().get(field)).collect(Collectors.joining(",")) + 
"\"");
         command.add("-N");
@@ -178,8 +201,19 @@ public class ClickhouseFileSinkWriter implements 
SinkWriter<SeaTunnelRow, CKComm
                 }
             }).collect(Collectors.joining(",")),
             uuid));
-        command.add("--path");
-        command.add("\"" + clickhouseLocalFile + "\"");
+        if (readerOption.isCompatibleMode()) {
+            String ckLocalConfigPath = String.format("%s/%s/config.xml", 
readerOption.getFileTempPath(), uuid);
+            try (FileWriter writer = new FileWriter(ckLocalConfigPath)) {
+                writer.write(String.format(CK_LOCAL_CONFIG_TEMPLATE, 
clickhouseLocalFile));
+            } catch (IOException e) {
+                throw new 
ClickhouseConnectorException(CommonErrorCode.FILE_OPERATION_FAILED, "Error 
occurs when create ck local config");
+            }
+            command.add("--config-file");
+            command.add("\"" + ckLocalConfigPath + "\"");
+        } else {
+            command.add("--path");
+            command.add("\"" + clickhouseLocalFile + "\"");
+        }
         log.info("Generate clickhouse local file command: {}", String.join(" 
", command));
         ProcessBuilder processBuilder = new ProcessBuilder("bash", "-c", 
String.join(" ", command));
         Process start = processBuilder.start();
@@ -192,6 +226,14 @@ public class ClickhouseFileSinkWriter implements 
SinkWriter<SeaTunnelRow, CKComm
                 log.info(line);
             }
         }
+        try (InputStream inputStream = start.getErrorStream();
+             InputStreamReader inputStreamReader = new 
InputStreamReader(inputStream);
+             BufferedReader bufferedReader = new 
BufferedReader(inputStreamReader)) {
+            String line;
+            while ((line = bufferedReader.readLine()) != null) {
+                log.error(line);
+            }
+        }
         start.waitFor();
         File file = new File(clickhouseLocalFile + "/data/_local/" + 
clickhouseTable.getLocalTableName());
         if (!file.exists()) {
@@ -204,10 +246,18 @@ public class ClickhouseFileSinkWriter implements 
SinkWriter<SeaTunnelRow, CKComm
         return Arrays.stream(files)
             .filter(File::isDirectory)
             .filter(f -> !"detached".equals(f.getName()))
-            .map(File::getAbsolutePath).collect(Collectors.toList());
+            .map(f -> {
+                File newFile = new File(f.getParent() + "/" + f.getName() + 
"_" + context.getIndexOfSubtask());
+                if (f.renameTo(newFile)) {
+                    return newFile;
+                } else {
+                    log.warn("rename file failed, will continue move file, but 
maybe cause file conflict");
+                    return f;
+                }
+            }).map(File::getAbsolutePath).collect(Collectors.toList());
     }
 
-    private void attachClickhouseLocalFileToServer(Shard shard, List<String> 
clickhouseLocalFiles) throws ClickHouseException {
+    private void moveClickhouseLocalFileToServer(Shard shard, List<String> 
clickhouseLocalFiles) {
         String hostAddress = shard.getNode().getAddress().getHostName();
         String user = readerOption.getNodeUser().getOrDefault(hostAddress, 
"root");
         String password = 
readerOption.getNodePassword().getOrDefault(hostAddress, null);
@@ -215,22 +265,15 @@ public class ClickhouseFileSinkWriter implements 
SinkWriter<SeaTunnelRow, CKComm
         fileTransfer.init();
         fileTransfer.transferAndChown(clickhouseLocalFiles, 
shardLocalDataPaths.get(shard).get(0) + "detached/");
         fileTransfer.close();
-        ClickHouseRequest<?> request = proxy.getClickhouseConnection(shard);
-        for (String clickhouseLocalFile : clickhouseLocalFiles) {
-            ClickHouseResponse response = request.query(String.format("ALTER 
TABLE %s ATTACH PART '%s'",
-                clickhouseTable.getLocalTableName(),
-                
clickhouseLocalFile.substring(clickhouseLocalFile.lastIndexOf("/") + 
1))).executeAndWait();
-            response.close();
-        }
     }
 
     private void clearLocalFileDirectory(List<String> clickhouseLocalFiles) {
         String clickhouseLocalFile = clickhouseLocalFiles.get(0);
-        String localFileDir = clickhouseLocalFile.substring(0, 
CLICKHOUSE_LOCAL_FILE_PREFIX.length() + UUID_LENGTH + 1);
+        String localFileDir = clickhouseLocalFile.substring(0, 
readerOption.getFileTempPath().length() + UUID_LENGTH + 1);
         try {
             File file = new File(localFileDir);
             if (file.exists()) {
-                FileUtils.deleteDirectory(new File(localFileDir));
+                FileUtils.deleteDirectory(file);
             }
         } catch (IOException e) {
             throw new 
ClickhouseConnectorException(ClickhouseConnectorErrorCode.DELETE_DIRECTORY_FIELD,
 "Unable to delete directory " + localFileDir, e);
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseFactoryTest.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/state/CKFileAggCommitInfo.java
similarity index 58%
copy from 
seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseFactoryTest.java
copy to 
seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/state/CKFileAggCommitInfo.java
index 2a4205016..9962b9dcb 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseFactoryTest.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/state/CKFileAggCommitInfo.java
@@ -15,19 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.clickhouse;
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.state;
 
-import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.ClickhouseSinkFactory;
-import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.source.ClickhouseSourceFactory;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
 
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
+import lombok.AllArgsConstructor;
+import lombok.Data;
 
-public class ClickhouseFactoryTest {
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+@Data
+@AllArgsConstructor
+public class CKFileAggCommitInfo implements Serializable {
+
+    private Map<Shard, List<String>> detachedFiles;
 
-    @Test
-    public void testOptionRule() {
-        Assertions.assertNotNull((new ClickhouseSourceFactory()).optionRule());
-        Assertions.assertNotNull((new ClickhouseSinkFactory()).optionRule());
-    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseFactoryTest.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/state/CKFileCommitInfo.java
similarity index 58%
copy from 
seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseFactoryTest.java
copy to 
seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/state/CKFileCommitInfo.java
index 2a4205016..1b5399b7c 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseFactoryTest.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/state/CKFileCommitInfo.java
@@ -15,19 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.clickhouse;
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.state;
 
-import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.ClickhouseSinkFactory;
-import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.source.ClickhouseSourceFactory;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
 
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
+import lombok.AllArgsConstructor;
+import lombok.Data;
 
-public class ClickhouseFactoryTest {
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+@Data
+@AllArgsConstructor
+public class CKFileCommitInfo implements Serializable {
+
+    private Map<Shard, List<String>> detachedFiles;
 
-    @Test
-    public void testOptionRule() {
-        Assertions.assertNotNull((new ClickhouseSourceFactory()).optionRule());
-        Assertions.assertNotNull((new ClickhouseSinkFactory()).optionRule());
-    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseFactoryTest.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseFactoryTest.java
index 2a4205016..e6c50b061 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseFactoryTest.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseFactoryTest.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.connectors.seatunnel.clickhouse;
 
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.ClickhouseSinkFactory;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file.ClickhouseFileSinkFactory;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.source.ClickhouseSourceFactory;
 
 import org.junit.jupiter.api.Assertions;
@@ -29,5 +30,6 @@ public class ClickhouseFactoryTest {
     public void testOptionRule() {
         Assertions.assertNotNull((new ClickhouseSourceFactory()).optionRule());
         Assertions.assertNotNull((new ClickhouseSinkFactory()).optionRule());
+        Assertions.assertNotNull((new 
ClickhouseFileSinkFactory()).optionRule());
     }
 }


Reply via email to