This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new e07e9a7cc [Connector-V2] [Clickhouse] Improve Clickhouse File
Connector (#3416)
e07e9a7cc is described below
commit e07e9a7cc29c8a77f5600634204efd4ec3884f19
Author: Hisoka <[email protected]>
AuthorDate: Tue Dec 13 10:24:54 2022 +0800
[Connector-V2] [Clickhouse] Improve Clickhouse File Connector (#3416)
* [Clickhouse] Change Random number to Context index
* Update docs/en/connector-v2/sink/ClickhouseFile.md
---
docs/en/connector-v2/sink/ClickhouseFile.md | 55 ++++---
.../clickhouse/config/ClickhouseConfig.java | 16 +++
.../clickhouse/config/FileReaderOption.java | 86 ++---------
.../clickhouse/sink/file/ClickhouseFileSink.java | 42 +++++-
.../sink/file/ClickhouseFileSinkAggCommitter.java | 94 ++++++++++++
.../sink/file/ClickhouseFileSinkFactory.java | 6 +-
.../sink/file/ClickhouseFileSinkWriter.java | 159 +++++++++++++--------
.../clickhouse/state/CKFileAggCommitInfo.java} | 24 ++--
.../clickhouse/state/CKFileCommitInfo.java} | 24 ++--
.../clickhouse/ClickhouseFactoryTest.java | 2 +
10 files changed, 334 insertions(+), 174 deletions(-)
diff --git a/docs/en/connector-v2/sink/ClickhouseFile.md
b/docs/en/connector-v2/sink/ClickhouseFile.md
index 86f762a9c..1eb2458d0 100644
--- a/docs/en/connector-v2/sink/ClickhouseFile.md
+++ b/docs/en/connector-v2/sink/ClickhouseFile.md
@@ -21,22 +21,25 @@ Write data to Clickhouse can also be done using JDBC
## Options
-| name | type | required | default value |
-| ---------------------- | ------- | -------- | ------------- |
-| host | string | yes | - |
-| database | string | yes | - |
-| table | string | yes | - |
-| username | string | yes | - |
-| password | string | yes | - |
-| clickhouse_local_path | string | yes | - |
-| sharding_key | string | no | - |
-| copy_method | string | no | scp |
-| node_free_password | boolean | no | false |
-| node_pass | list | no | - |
-| node_pass.node_address | string | no | - |
-| node_pass.username | string | no | "root" |
-| node_pass.password | string | no | - |
-| common-options | | no | - |
+| name | type | required | default value
|
+|------------------------|---------|----------|----------------------------------------|
+| host | string | yes | -
|
+| database | string | yes | -
|
+| table | string | yes | -
|
+| username | string | yes | -
|
+| password | string | yes | -
|
+| clickhouse_local_path | string | yes | -
|
+| sharding_key | string | no | -
|
+| copy_method | string | no | scp
|
+| node_free_password | boolean | no | false
|
+| node_pass | list | no | -
|
+| node_pass.node_address | string | no | -
|
+| node_pass.username | string | no | "root"
|
+| node_pass.password | string | no | -
|
+| compatible_mode | boolean | no | false
|
+| file_fields_delimiter | string | no | "\t"
|
+| file_temp_path | string | no |
"/tmp/seatunnel/clickhouse-local/file" |
+| common-options | | no | -
|
### host [string]
@@ -94,6 +97,21 @@ The username corresponding to the clickhouse server, default
root user.
The password corresponding to the clickhouse server.
+### compatible_mode [boolean]
+
+In the lower version of Clickhouse, the ClickhouseLocal program does not
support the `--path` parameter,
+you need to use this mode to take other ways to realize the `--path` parameter
function
+
+### file_fields_delimiter [string]
+
+ClickhouseFile uses csv format to temporarily save data. If the data in the
row contains the delimiter value
+of csv, it may cause program exceptions.
+Avoid this with this configuration. Value string has to be an exactly one
character long
+
+### file_temp_path [string]
+
+The directory where ClickhouseFile stores temporary files locally.
+
### common options
Sink plugin common parameters, please refer to [Sink Common
Options](common-options.md) for details
@@ -122,3 +140,8 @@ Sink plugin common parameters, please refer to [Sink Common
Options](common-opti
### 2.2.0-beta 2022-09-26
- Support write data to ClickHouse File and move to ClickHouse data dir
+
+### Next version
+
+- [BugFix] Fix generated data part name conflict and improve file commit logic
[3416](https://github.com/apache/incubator-seatunnel/pull/3416)
+- [Feature] Support compatible_mode compatible with lower version Clickhouse
[3416](https://github.com/apache/incubator-seatunnel/pull/3416)
\ No newline at end of file
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java
index 62f7bd398..3f48df990 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java
@@ -96,7 +96,16 @@ public class ClickhouseConfig {
public static final Option<ClickhouseFileCopyMethod> COPY_METHOD =
Options.key("copy_method").enumType(ClickhouseFileCopyMethod.class)
.defaultValue(ClickhouseFileCopyMethod.SCP).withDescription("The
method of copy Clickhouse file");
+ public static final Option<Boolean> COMPATIBLE_MODE =
Options.key("compatible_mode").booleanType()
+ .defaultValue(false).withDescription("In the lower version of
Clickhouse, the ClickhouseLocal program does not support the `--path`
parameter, " +
+ "you need to use this mode to take other ways to realize the
--path parameter function");
+
public static final String NODE_ADDRESS = "node_address";
+
+ public static final Option<Boolean> NODE_FREE_PASSWORD =
Options.key("node_free_password").booleanType()
+ .defaultValue(false).withDescription("Because seatunnel need to use
scp or rsync for file transfer, " +
+ "seatunnel need clickhouse server-side access. If each spark node
and clickhouse server are configured with password-free login, " +
+ "you can configure this option to true, otherwise you need to
configure the corresponding node password in the node_pass configuration");
/**
* The password of Clickhouse server node
*/
@@ -106,4 +115,11 @@ public class ClickhouseConfig {
public static final Option<Map<String, String>> CLICKHOUSE_PREFIX =
Options.key("clickhouse").mapType()
.defaultValue(Collections.emptyMap()).withDescription("Clickhouse
custom config");
+ public static final Option<String> FILE_FIELDS_DELIMITER =
Options.key("file_fields_delimiter").stringType()
+ .defaultValue("\t").withDescription("ClickhouseFile uses csv format to
temporarily save data. If the data in the row contains the delimiter value of
csv," +
+ " it may cause program exceptions. Avoid this with this
configuration. Value string has to be an exactly one character long");
+
+ public static final Option<String> FILE_TEMP_PATH =
Options.key("file_temp_path").stringType()
+
.defaultValue("/tmp/seatunnel/clickhouse-local/file").withDescription("The
directory where ClickhouseFile stores temporary files locally.");
+
}
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/FileReaderOption.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/FileReaderOption.java
index 283c1c4f9..fb00ee995 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/FileReaderOption.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/FileReaderOption.java
@@ -20,10 +20,13 @@ package
org.apache.seatunnel.connectors.seatunnel.clickhouse.config;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.ShardMetadata;
+import lombok.Data;
+
import java.io.Serializable;
import java.util.List;
import java.util.Map;
+@Data
public class FileReaderOption implements Serializable {
private ShardMetadata shardMetadata;
@@ -35,90 +38,29 @@ public class FileReaderOption implements Serializable {
private Map<String, String> nodeUser;
private Map<String, String> nodePassword;
private SeaTunnelRowType seaTunnelRowType;
+ private boolean compatibleMode;
+ private String fileTempPath;
+ private String fileFieldsDelimiter;
public FileReaderOption(ShardMetadata shardMetadata, Map<String, String>
tableSchema,
List<String> fields, String clickhouseLocalPath,
ClickhouseFileCopyMethod copyMethod,
Map<String, String> nodeUser,
- Map<String, String> nodePassword) {
+ boolean nodeFreePass,
+ Map<String, String> nodePassword,
+ boolean compatibleMode,
+ String fileTempPath,
+ String fileFieldsDelimiter) {
this.shardMetadata = shardMetadata;
this.tableSchema = tableSchema;
this.fields = fields;
this.clickhouseLocalPath = clickhouseLocalPath;
this.copyMethod = copyMethod;
this.nodeUser = nodeUser;
- this.nodePassword = nodePassword;
- }
-
- public SeaTunnelRowType getSeaTunnelRowType() {
- return seaTunnelRowType;
- }
-
- public void setSeaTunnelRowType(SeaTunnelRowType seaTunnelRowType) {
- this.seaTunnelRowType = seaTunnelRowType;
- }
-
- public boolean isNodeFreePass() {
- return nodeFreePass;
- }
-
- public void setNodeFreePass(boolean nodeFreePass) {
this.nodeFreePass = nodeFreePass;
- }
-
- public String getClickhouseLocalPath() {
- return clickhouseLocalPath;
- }
-
- public void setClickhouseLocalPath(String clickhouseLocalPath) {
- this.clickhouseLocalPath = clickhouseLocalPath;
- }
-
- public ClickhouseFileCopyMethod getCopyMethod() {
- return copyMethod;
- }
-
- public void setCopyMethod(ClickhouseFileCopyMethod copyMethod) {
- this.copyMethod = copyMethod;
- }
-
- public Map<String, String> getNodeUser() {
- return nodeUser;
- }
-
- public void setNodeUser(Map<String, String> nodeUser) {
- this.nodeUser = nodeUser;
- }
-
- public Map<String, String> getNodePassword() {
- return nodePassword;
- }
-
- public void setNodePassword(Map<String, String> nodePassword) {
this.nodePassword = nodePassword;
- }
-
- public ShardMetadata getShardMetadata() {
- return shardMetadata;
- }
-
- public void setShardMetadata(ShardMetadata shardMetadata) {
- this.shardMetadata = shardMetadata;
- }
-
- public Map<String, String> getTableSchema() {
- return tableSchema;
- }
-
- public void setTableSchema(Map<String, String> tableSchema) {
- this.tableSchema = tableSchema;
- }
-
- public List<String> getFields() {
- return fields;
- }
-
- public void setFields(List<String> fields) {
- this.fields = fields;
+ this.compatibleMode = compatibleMode;
+ this.fileFieldsDelimiter = fileFieldsDelimiter;
+ this.fileTempPath = fileTempPath;
}
}
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
index 973478a8b..7aa5010f2 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
@@ -18,11 +18,15 @@
package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file;
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.CLICKHOUSE_LOCAL_PATH;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.COMPATIBLE_MODE;
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.COPY_METHOD;
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE;
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FIELDS;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FILE_FIELDS_DELIMITER;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FILE_TEMP_PATH;
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST;
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.NODE_ADDRESS;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.NODE_FREE_PASSWORD;
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.NODE_PASS;
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PASSWORD;
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SHARDING_KEY;
@@ -31,7 +35,10 @@ import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Clickh
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.serialization.DefaultSerializer;
+import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -46,8 +53,8 @@ import
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.Clickhouse
import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.ShardMetadata;
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseProxy;
-import
org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKAggCommitInfo;
-import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKCommitInfo;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKFileAggCommitInfo;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKFileCommitInfo;
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSinkState;
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseUtil;
@@ -62,10 +69,11 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.stream.Collectors;
@AutoService(SeaTunnelSink.class)
-public class ClickhouseFileSink implements SeaTunnelSink<SeaTunnelRow,
ClickhouseSinkState, CKCommitInfo, CKAggCommitInfo> {
+public class ClickhouseFileSink implements SeaTunnelSink<SeaTunnelRow,
ClickhouseSinkState, CKFileCommitInfo, CKFileAggCommitInfo> {
private FileReaderOption readerOption;
@@ -85,6 +93,10 @@ public class ClickhouseFileSink implements
SeaTunnelSink<SeaTunnelRow, Clickhous
}
Map<String, Object> defaultConfigs = ImmutableMap.<String,
Object>builder()
.put(COPY_METHOD.key(), COPY_METHOD.defaultValue().getName())
+ .put(NODE_FREE_PASSWORD.key(), NODE_FREE_PASSWORD.defaultValue())
+ .put(COMPATIBLE_MODE.key(), COMPATIBLE_MODE.defaultValue())
+ .put(FILE_TEMP_PATH.key(), FILE_TEMP_PATH.defaultValue())
+ .put(FILE_FIELDS_DELIMITER.key(),
FILE_FIELDS_DELIMITER.defaultValue())
.build();
config = config.withFallback(ConfigFactory.parseMap(defaultConfigs));
@@ -126,8 +138,13 @@ public class ClickhouseFileSink implements
SeaTunnelSink<SeaTunnelRow, Clickhous
configObject ->
configObject.toConfig().getString(PASSWORD.key())));
proxy.close();
+
+ if (config.getString(FILE_FIELDS_DELIMITER.key()).length() != 1) {
+ throw new
ClickhouseConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
FILE_FIELDS_DELIMITER.key() + " must be a single character");
+ }
this.readerOption = new FileReaderOption(shardMetadata, tableSchema,
fields, config.getString(CLICKHOUSE_LOCAL_PATH.key()),
-
ClickhouseFileCopyMethod.from(config.getString(COPY_METHOD.key())), nodeUser,
nodePassword);
+
ClickhouseFileCopyMethod.from(config.getString(COPY_METHOD.key())), nodeUser,
config.getBoolean(NODE_FREE_PASSWORD.key()), nodePassword,
+ config.getBoolean(COMPATIBLE_MODE.key()),
config.getString(FILE_TEMP_PATH.key()),
config.getString(FILE_FIELDS_DELIMITER.key()));
}
@Override
@@ -141,7 +158,22 @@ public class ClickhouseFileSink implements
SeaTunnelSink<SeaTunnelRow, Clickhous
}
@Override
- public SinkWriter<SeaTunnelRow, CKCommitInfo, ClickhouseSinkState>
createWriter(SinkWriter.Context context) throws IOException {
+ public SinkWriter<SeaTunnelRow, CKFileCommitInfo, ClickhouseSinkState>
createWriter(SinkWriter.Context context) throws IOException {
return new ClickhouseFileSinkWriter(readerOption, context);
}
+
+ @Override
+ public Optional<Serializer<CKFileCommitInfo>> getCommitInfoSerializer() {
+ return Optional.of(new DefaultSerializer<>());
+ }
+
+ @Override
+ public Optional<SinkAggregatedCommitter<CKFileCommitInfo,
CKFileAggCommitInfo>> createAggregatedCommitter() throws IOException {
+ return Optional.of(new
ClickhouseFileSinkAggCommitter(this.readerOption));
+ }
+
+ @Override
+ public Optional<Serializer<CKFileAggCommitInfo>>
getAggregatedCommitInfoSerializer() {
+ return Optional.of(new DefaultSerializer<>());
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkAggCommitter.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkAggCommitter.java
new file mode 100644
index 000000000..80ce83aa5
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkAggCommitter.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file;
+
+import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.FileReaderOption;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseProxy;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKFileAggCommitInfo;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKFileCommitInfo;
+
+import com.clickhouse.client.ClickHouseException;
+import com.clickhouse.client.ClickHouseRequest;
+import com.clickhouse.client.ClickHouseResponse;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ClickhouseFileSinkAggCommitter implements
SinkAggregatedCommitter<CKFileCommitInfo, CKFileAggCommitInfo> {
+
+ private final ClickhouseProxy proxy;
+ private final ClickhouseTable clickhouseTable;
+
+ public ClickhouseFileSinkAggCommitter(FileReaderOption readerOption) {
+ proxy = new
ClickhouseProxy(readerOption.getShardMetadata().getDefaultShard().getNode());
+ clickhouseTable =
proxy.getClickhouseTable(readerOption.getShardMetadata().getDatabase(),
+ readerOption.getShardMetadata().getTable());
+ }
+
+ @Override
+ public List<CKFileAggCommitInfo> commit(List<CKFileAggCommitInfo>
aggregatedCommitInfo) throws IOException {
+ aggregatedCommitInfo.forEach(commitInfo ->
commitInfo.getDetachedFiles().forEach((shard, files) -> {
+ try {
+ this.attachFileToClickhouse(shard, files);
+ } catch (ClickHouseException e) {
+ throw new SeaTunnelException("failed commit file to
clickhouse", e);
+ }
+ }));
+ return new ArrayList<>();
+ }
+
+ @Override
+ public CKFileAggCommitInfo combine(List<CKFileCommitInfo> commitInfos) {
+ Map<Shard, List<String>> files = new HashMap<>();
+ commitInfos.forEach(infos -> infos.getDetachedFiles().forEach((shard,
file) -> {
+ if (files.containsKey(shard)) {
+ files.get(shard).addAll(file);
+ } else {
+ files.put(shard, file);
+ }
+ }));
+ return new CKFileAggCommitInfo(files);
+ }
+
+ @Override
+ public void abort(List<CKFileAggCommitInfo> aggregatedCommitInfo) throws
Exception {
+
+ }
+
+ @Override
+ public void close() throws IOException {
+ proxy.close();
+ }
+
+ private void attachFileToClickhouse(Shard shard, List<String>
clickhouseLocalFiles) throws ClickHouseException {
+ ClickHouseRequest<?> request = proxy.getClickhouseConnection(shard);
+ for (String clickhouseLocalFile : clickhouseLocalFiles) {
+ ClickHouseResponse response = request.query(String.format("ALTER
TABLE %s ATTACH PART '%s'",
+ clickhouseTable.getLocalTableName(),
+
clickhouseLocalFile.substring(clickhouseLocalFile.lastIndexOf("/") +
1))).executeAndWait();
+ response.close();
+ }
+ }
+
+}
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkFactory.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkFactory.java
index 2d9ccf220..829d8e005 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkFactory.java
@@ -18,10 +18,14 @@
package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file;
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.CLICKHOUSE_LOCAL_PATH;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.COMPATIBLE_MODE;
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.COPY_METHOD;
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE;
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FIELDS;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FILE_FIELDS_DELIMITER;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FILE_TEMP_PATH;
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.NODE_FREE_PASSWORD;
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.NODE_PASS;
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PASSWORD;
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SHARDING_KEY;
@@ -44,6 +48,6 @@ public class ClickhouseFileSinkFactory implements
TableSinkFactory {
@Override
public OptionRule optionRule() {
return OptionRule.builder().required(HOST, TABLE, DATABASE, USERNAME,
PASSWORD, CLICKHOUSE_LOCAL_PATH)
- .optional(COPY_METHOD, SHARDING_KEY, FIELDS, NODE_PASS).build();
+ .optional(COPY_METHOD, SHARDING_KEY, FIELDS, NODE_FREE_PASSWORD,
NODE_PASS, COMPATIBLE_MODE, FILE_FIELDS_DELIMITER, FILE_TEMP_PATH).build();
}
}
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java
index 6f24e2c79..14b5c181c 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java
@@ -27,17 +27,15 @@ import
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.Clickhouse
import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseProxy;
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ShardRouter;
-import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKCommitInfo;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKFileCommitInfo;
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSinkState;
-import com.clickhouse.client.ClickHouseException;
-import com.clickhouse.client.ClickHouseRequest;
-import com.clickhouse.client.ClickHouseResponse;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import java.io.BufferedReader;
import java.io.File;
+import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
@@ -48,7 +46,6 @@ import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -58,24 +55,32 @@ import java.util.function.Function;
import java.util.stream.Collectors;
@Slf4j
-public class ClickhouseFileSinkWriter implements SinkWriter<SeaTunnelRow,
CKCommitInfo, ClickhouseSinkState> {
- private static final String CLICKHOUSE_LOCAL_FILE_PREFIX =
"/tmp/clickhouse-local/seatunnel-file";
+public class ClickhouseFileSinkWriter implements SinkWriter<SeaTunnelRow,
CKFileCommitInfo, ClickhouseSinkState> {
+
+ private static final String CK_LOCAL_CONFIG_TEMPLATE = "<yandex><path> %s
</path> <users><default><password/> <profile>default</profile>
<quota>default</quota>" +
+
"<access_management>1</access_management></default></users><profiles><default/></profiles><quotas><default/></quotas></yandex>";
+ private static final String CLICKHOUSE_LOCAL_FILE_SUFFIX =
"/local_data.log";
private static final int UUID_LENGTH = 10;
private final FileReaderOption readerOption;
private final ShardRouter shardRouter;
private final ClickhouseProxy proxy;
private final ClickhouseTable clickhouseTable;
private final Map<Shard, List<String>> shardLocalDataPaths;
- private final Map<Shard, List<SeaTunnelRow>> rowCache;
+ private final Map<Shard, FileChannel> rowCache;
+
+ private final Map<Shard, String> shardTempFile;
+
+ private final SinkWriter.Context context;
public ClickhouseFileSinkWriter(FileReaderOption readerOption,
SinkWriter.Context context) {
this.readerOption = readerOption;
+ this.context = context;
proxy = new
ClickhouseProxy(this.readerOption.getShardMetadata().getDefaultShard().getNode());
shardRouter = new ShardRouter(proxy,
this.readerOption.getShardMetadata());
clickhouseTable =
proxy.getClickhouseTable(this.readerOption.getShardMetadata().getDatabase(),
this.readerOption.getShardMetadata().getTable());
rowCache = new HashMap<>(Common.COLLECTION_SIZE);
-
+ shardTempFile = new HashMap<>();
nodePasswordCheck();
// find file local save path of each node
@@ -90,7 +95,20 @@ public class ClickhouseFileSinkWriter implements
SinkWriter<SeaTunnelRow, CKComm
@Override
public void write(SeaTunnelRow element) throws IOException {
Shard shard = shardRouter.getShard(element);
- rowCache.computeIfAbsent(shard, k -> new ArrayList<>()).add(element);
+ FileChannel channel = rowCache.computeIfAbsent(shard, k -> {
+ try {
+ String uuid = UUID.randomUUID().toString().substring(0,
UUID_LENGTH).replaceAll("-", "_");
+ String clickhouseLocalFile = String.format("%s/%s",
readerOption.getFileTempPath(), uuid);
+ FileUtils.forceMkdir(new File(clickhouseLocalFile));
+ String clickhouseLocalFileTmpFile = clickhouseLocalFile +
CLICKHOUSE_LOCAL_FILE_SUFFIX;
+ shardTempFile.put(shard, clickhouseLocalFileTmpFile);
+ return FileChannel.open(Paths.get(clickhouseLocalFileTmpFile),
StandardOpenOption.WRITE,
+ StandardOpenOption.READ, StandardOpenOption.CREATE_NEW);
+ } catch (IOException e) {
+ throw new
ClickhouseConnectorException(CommonErrorCode.FILE_OPERATION_FAILED, "can't
create new file to save tmp data", e);
+ }
+ });
+ saveDataToFile(channel, element);
}
private void nodePasswordCheck() {
@@ -105,62 +123,67 @@ public class ClickhouseFileSinkWriter implements
SinkWriter<SeaTunnelRow, CKComm
}
@Override
- public Optional<CKCommitInfo> prepareCommit() throws IOException {
- return Optional.empty();
+ public Optional<CKFileCommitInfo> prepareCommit() throws IOException {
+ for (FileChannel channel : rowCache.values()) {
+ channel.close();
+ }
+ Map<Shard, List<String>> detachedFiles = new HashMap<>();
+ shardTempFile.forEach((shard, path) -> {
+ List<String> clickhouseLocalFiles = null;
+ try {
+ clickhouseLocalFiles = generateClickhouseLocalFiles(path);
+ // move file to server
+ moveClickhouseLocalFileToServer(shard, clickhouseLocalFiles);
+ detachedFiles.put(shard, clickhouseLocalFiles);
+ } catch (Exception e) {
+ throw new
ClickhouseConnectorException(CommonErrorCode.FLUSH_DATA_FAILED, "Flush data
into clickhouse file error", e);
+ } finally {
+ if (clickhouseLocalFiles != null &&
!clickhouseLocalFiles.isEmpty()) {
+ // clear local file
+ clearLocalFileDirectory(clickhouseLocalFiles);
+ }
+ }
+ });
+ rowCache.clear();
+ shardTempFile.clear();
+ return Optional.of(new CKFileCommitInfo(detachedFiles));
}
@Override
public void abortPrepare() {
-
}
@Override
public void close() throws IOException {
- rowCache.forEach(this::flush);
+ for (FileChannel channel : rowCache.values()) {
+ channel.close();
+ }
}
- private void flush(Shard shard, List<SeaTunnelRow> rows) {
- try {
- // generate clickhouse local file
- // TODO generate file by sub rows to save memory
- List<String> clickhouseLocalFiles =
generateClickhouseLocalFiles(rows);
- // move file to server
- attachClickhouseLocalFileToServer(shard, clickhouseLocalFiles);
- // clear local file
- clearLocalFileDirectory(clickhouseLocalFiles);
- } catch (Exception e) {
- throw new
ClickhouseConnectorException(CommonErrorCode.FLUSH_DATA_FAILED, "Flush data
into clickhouse file error", e);
- }
+ private void saveDataToFile(FileChannel fileChannel, SeaTunnelRow row)
throws IOException {
+ String data = this.readerOption.getFields().stream().map(field ->
row.getField(this.readerOption.getSeaTunnelRowType().indexOf(field)).toString())
+
.collect(Collectors.joining(readerOption.getFileFieldsDelimiter())) + "\n";
+ MappedByteBuffer buffer =
fileChannel.map(FileChannel.MapMode.READ_WRITE, fileChannel.size(),
+ data.getBytes(StandardCharsets.UTF_8).length);
+ buffer.put(data.getBytes(StandardCharsets.UTF_8));
}
- private List<String> generateClickhouseLocalFiles(List<SeaTunnelRow> rows)
throws IOException,
+ private List<String> generateClickhouseLocalFiles(String
clickhouseLocalFileTmpFile) throws IOException,
InterruptedException {
- if (rows.isEmpty()) {
- return Collections.emptyList();
- }
- String uuid = UUID.randomUUID().toString().substring(0,
UUID_LENGTH).replaceAll("-", "_");
- String clickhouseLocalFile = String.format("%s/%s",
CLICKHOUSE_LOCAL_FILE_PREFIX, uuid);
- FileUtils.forceMkdir(new File(clickhouseLocalFile));
- String clickhouseLocalFileTmpFile = clickhouseLocalFile +
"/local_data.log";
- try (FileChannel fileChannel =
FileChannel.open(Paths.get(clickhouseLocalFileTmpFile),
StandardOpenOption.WRITE,
- StandardOpenOption.READ, StandardOpenOption.CREATE_NEW)) {
- String data = rows.stream()
- .map(row -> this.readerOption.getFields().stream().map(field
->
row.getField(this.readerOption.getSeaTunnelRowType().indexOf(field)).toString())
- .collect(Collectors.joining("\t")))
- .collect(Collectors.joining("\n"));
- MappedByteBuffer buffer =
fileChannel.map(FileChannel.MapMode.READ_WRITE, fileChannel.size(),
- data.getBytes(StandardCharsets.UTF_8).length);
- buffer.put(data.getBytes(StandardCharsets.UTF_8));
- }
-
+ // temp file path format prefix/<uuid>/suffix
+ String[] tmpStrArr = clickhouseLocalFileTmpFile.split("/");
+ String uuid = tmpStrArr[tmpStrArr.length - 2];
List<String> localPaths =
Arrays.stream(this.readerOption.getClickhouseLocalPath().trim().split(" "))
.collect(Collectors.toList());
+ String clickhouseLocalFile = clickhouseLocalFileTmpFile.substring(0,
clickhouseLocalFileTmpFile.length() - CLICKHOUSE_LOCAL_FILE_SUFFIX.length());
List<String> command = new ArrayList<>(localPaths);
if (localPaths.size() == 1) {
command.add("local");
}
command.add("--file");
command.add(clickhouseLocalFileTmpFile);
+ command.add("--format_csv_delimiter");
+ command.add("\"" + readerOption.getFileFieldsDelimiter() + "\"");
command.add("-S");
command.add("\"" + this.readerOption.getFields().stream().map(field ->
field + " " +
readerOption.getTableSchema().get(field)).collect(Collectors.joining(",")) +
"\"");
command.add("-N");
@@ -178,8 +201,19 @@ public class ClickhouseFileSinkWriter implements
SinkWriter<SeaTunnelRow, CKComm
}
}).collect(Collectors.joining(",")),
uuid));
- command.add("--path");
- command.add("\"" + clickhouseLocalFile + "\"");
+ if (readerOption.isCompatibleMode()) {
+ String ckLocalConfigPath = String.format("%s/%s/config.xml",
readerOption.getFileTempPath(), uuid);
+ try (FileWriter writer = new FileWriter(ckLocalConfigPath)) {
+ writer.write(String.format(CK_LOCAL_CONFIG_TEMPLATE,
clickhouseLocalFile));
+ } catch (IOException e) {
+ throw new
ClickhouseConnectorException(CommonErrorCode.FILE_OPERATION_FAILED, "Error
occurs when create ck local config");
+ }
+ command.add("--config-file");
+ command.add("\"" + ckLocalConfigPath + "\"");
+ } else {
+ command.add("--path");
+ command.add("\"" + clickhouseLocalFile + "\"");
+ }
log.info("Generate clickhouse local file command: {}", String.join("
", command));
ProcessBuilder processBuilder = new ProcessBuilder("bash", "-c",
String.join(" ", command));
Process start = processBuilder.start();
@@ -192,6 +226,14 @@ public class ClickhouseFileSinkWriter implements
SinkWriter<SeaTunnelRow, CKComm
log.info(line);
}
}
+ try (InputStream inputStream = start.getErrorStream();
+ InputStreamReader inputStreamReader = new
InputStreamReader(inputStream);
+ BufferedReader bufferedReader = new
BufferedReader(inputStreamReader)) {
+ String line;
+ while ((line = bufferedReader.readLine()) != null) {
+ log.error(line);
+ }
+ }
start.waitFor();
File file = new File(clickhouseLocalFile + "/data/_local/" +
clickhouseTable.getLocalTableName());
if (!file.exists()) {
@@ -204,10 +246,18 @@ public class ClickhouseFileSinkWriter implements
SinkWriter<SeaTunnelRow, CKComm
return Arrays.stream(files)
.filter(File::isDirectory)
.filter(f -> !"detached".equals(f.getName()))
- .map(File::getAbsolutePath).collect(Collectors.toList());
+ .map(f -> {
+ File newFile = new File(f.getParent() + "/" + f.getName() +
"_" + context.getIndexOfSubtask());
+ if (f.renameTo(newFile)) {
+ return newFile;
+ } else {
+ log.warn("rename file failed, will continue move file, but
maybe cause file conflict");
+ return f;
+ }
+ }).map(File::getAbsolutePath).collect(Collectors.toList());
}
- private void attachClickhouseLocalFileToServer(Shard shard, List<String>
clickhouseLocalFiles) throws ClickHouseException {
+ private void moveClickhouseLocalFileToServer(Shard shard, List<String>
clickhouseLocalFiles) {
String hostAddress = shard.getNode().getAddress().getHostName();
String user = readerOption.getNodeUser().getOrDefault(hostAddress,
"root");
String password =
readerOption.getNodePassword().getOrDefault(hostAddress, null);
@@ -215,22 +265,15 @@ public class ClickhouseFileSinkWriter implements
SinkWriter<SeaTunnelRow, CKComm
fileTransfer.init();
fileTransfer.transferAndChown(clickhouseLocalFiles,
shardLocalDataPaths.get(shard).get(0) + "detached/");
fileTransfer.close();
- ClickHouseRequest<?> request = proxy.getClickhouseConnection(shard);
- for (String clickhouseLocalFile : clickhouseLocalFiles) {
- ClickHouseResponse response = request.query(String.format("ALTER
TABLE %s ATTACH PART '%s'",
- clickhouseTable.getLocalTableName(),
-
clickhouseLocalFile.substring(clickhouseLocalFile.lastIndexOf("/") +
1))).executeAndWait();
- response.close();
- }
}
private void clearLocalFileDirectory(List<String> clickhouseLocalFiles) {
String clickhouseLocalFile = clickhouseLocalFiles.get(0);
- String localFileDir = clickhouseLocalFile.substring(0,
CLICKHOUSE_LOCAL_FILE_PREFIX.length() + UUID_LENGTH + 1);
+ String localFileDir = clickhouseLocalFile.substring(0,
readerOption.getFileTempPath().length() + UUID_LENGTH + 1);
try {
File file = new File(localFileDir);
if (file.exists()) {
- FileUtils.deleteDirectory(new File(localFileDir));
+ FileUtils.deleteDirectory(file);
}
} catch (IOException e) {
throw new
ClickhouseConnectorException(ClickhouseConnectorErrorCode.DELETE_DIRECTORY_FIELD,
"Unable to delete directory " + localFileDir, e);
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseFactoryTest.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/state/CKFileAggCommitInfo.java
similarity index 58%
copy from
seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseFactoryTest.java
copy to
seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/state/CKFileAggCommitInfo.java
index 2a4205016..9962b9dcb 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseFactoryTest.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/state/CKFileAggCommitInfo.java
@@ -15,19 +15,21 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.clickhouse;
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.state;
-import
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.ClickhouseSinkFactory;
-import
org.apache.seatunnel.connectors.seatunnel.clickhouse.source.ClickhouseSourceFactory;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
+import lombok.AllArgsConstructor;
+import lombok.Data;
-public class ClickhouseFactoryTest {
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+@Data
+@AllArgsConstructor
+public class CKFileAggCommitInfo implements Serializable {
+
+ private Map<Shard, List<String>> detachedFiles;
- @Test
- public void testOptionRule() {
- Assertions.assertNotNull((new ClickhouseSourceFactory()).optionRule());
- Assertions.assertNotNull((new ClickhouseSinkFactory()).optionRule());
- }
}
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseFactoryTest.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/state/CKFileCommitInfo.java
similarity index 58%
copy from
seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseFactoryTest.java
copy to
seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/state/CKFileCommitInfo.java
index 2a4205016..1b5399b7c 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseFactoryTest.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/state/CKFileCommitInfo.java
@@ -15,19 +15,21 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.clickhouse;
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.state;
-import
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.ClickhouseSinkFactory;
-import
org.apache.seatunnel.connectors.seatunnel.clickhouse.source.ClickhouseSourceFactory;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
+import lombok.AllArgsConstructor;
+import lombok.Data;
-public class ClickhouseFactoryTest {
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+@Data
+@AllArgsConstructor
+public class CKFileCommitInfo implements Serializable {
+
+ private Map<Shard, List<String>> detachedFiles;
- @Test
- public void testOptionRule() {
- Assertions.assertNotNull((new ClickhouseSourceFactory()).optionRule());
- Assertions.assertNotNull((new ClickhouseSinkFactory()).optionRule());
- }
}
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseFactoryTest.java
b/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseFactoryTest.java
index 2a4205016..e6c50b061 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseFactoryTest.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseFactoryTest.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.clickhouse;
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.ClickhouseSinkFactory;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file.ClickhouseFileSinkFactory;
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.source.ClickhouseSourceFactory;
import org.junit.jupiter.api.Assertions;
@@ -29,5 +30,6 @@ public class ClickhouseFactoryTest {
public void testOptionRule() {
Assertions.assertNotNull((new ClickhouseSourceFactory()).optionRule());
Assertions.assertNotNull((new ClickhouseSinkFactory()).optionRule());
+ Assertions.assertNotNull((new
ClickhouseFileSinkFactory()).optionRule());
}
}