liunaijie commented on code in PR #8747:
URL: https://github.com/apache/seatunnel/pull/8747#discussion_r1957467592


##########
seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkFactory.java:
##########
@@ -60,4 +81,91 @@ public OptionRule optionRule() {
                         KEY_PATH)
                 .build();
     }
+
+    @Override
+    public TableSink createSink(TableSinkFactoryContext context) {
+        ReadonlyConfig readonlyConfig = context.getOptions();
+        CatalogTable catalogTable = context.getCatalogTable();
+
+        List<ClickHouseNode> nodes =
+                ClickhouseUtil.createNodes(
+                        readonlyConfig.get(HOST),
+                        readonlyConfig.get(DATABASE),
+                        readonlyConfig.get(SERVER_TIME_ZONE),
+                        readonlyConfig.get(USERNAME),
+                        readonlyConfig.get(PASSWORD),
+                        null);
+
+        ClickhouseProxy proxy = new ClickhouseProxy(nodes.get(0));
+        Map<String, String> tableSchema = 
proxy.getClickhouseTableSchema(readonlyConfig.get(TABLE));
+        ClickhouseTable table =
+                proxy.getClickhouseTable(
+                        proxy.getClickhouseConnection(),
+                        readonlyConfig.get(DATABASE),
+                        readonlyConfig.get(TABLE));
+        String shardKey = null;
+        String shardKeyType = null;
+        if (readonlyConfig.getOptional(SHARDING_KEY).isPresent()) {
+            shardKey = readonlyConfig.getOptional(SHARDING_KEY).get();
+            shardKeyType = tableSchema.get(shardKey);
+        }
+
+        ShardMetadata shardMetadata =
+                new ShardMetadata(
+                        shardKey,
+                        shardKeyType,
+                        readonlyConfig.get(DATABASE),
+                        readonlyConfig.get(TABLE),
+                        table.getEngine(),
+                        true,
+                        new Shard(1, 1, nodes.get(0)),
+                        readonlyConfig.get(USERNAME),
+                        readonlyConfig.get(PASSWORD));
+        List<String> fields = new ArrayList<>(tableSchema.keySet());
+
+        Map<String, String> nodeUser =
+                
readonlyConfig.toConfig().getObjectList(NODE_PASS.key()).stream()
+                        .collect(
+                                Collectors.toMap(
+                                        configObject ->
+                                                
configObject.toConfig().getString(NODE_ADDRESS),
+                                        configObject ->
+                                                
configObject.toConfig().hasPath(USERNAME.key())
+                                                        ? configObject
+                                                                .toConfig()
+                                                                
.getString(USERNAME.key())
+                                                        : "root"));
+
+        Map<String, String> nodePassword =
+                readonlyConfig.get(NODE_PASS).stream()
+                        .collect(
+                                Collectors.toMap(
+                                        NodePassConfig::getNodeAddress,
+                                        NodePassConfig::getPassword));
+
+        proxy.close();
+
+        if (readonlyConfig.get(FILE_FIELDS_DELIMITER).length() != 1) {
+            throw new ClickhouseConnectorException(
+                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                    FILE_FIELDS_DELIMITER.key() + " must be a single 
character");
+        }
+        FileReaderOption readerOption =
+                new FileReaderOption(
+                        shardMetadata,
+                        tableSchema,
+                        fields,
+                        readonlyConfig.get(CLICKHOUSE_LOCAL_PATH),
+                        
ClickhouseFileCopyMethod.from(readonlyConfig.get(COPY_METHOD).getName()),
+                        nodeUser,
+                        readonlyConfig.get(NODE_FREE_PASSWORD),
+                        nodePassword,
+                        readonlyConfig.get(COMPATIBLE_MODE),
+                        readonlyConfig.get(FILE_TEMP_PATH),
+                        readonlyConfig.get(FILE_FIELDS_DELIMITER),
+                        readonlyConfig.get(KEY_PATH));
+
+        readerOption.setSeaTunnelRowType(catalogTable.getSeaTunnelRowType());
+        return () -> new ClickhouseFileSink(readerOption);

Review Comment:
   When implement `createSink` method, we can delete the `prepare` method in 
`ClickhouseSink`



##########
seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseSinkOptions.java:
##########
@@ -0,0 +1,107 @@
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.sink.DataSaveMode;
+import org.apache.seatunnel.api.sink.SaveModePlaceHolder;
+import org.apache.seatunnel.api.sink.SchemaSaveMode;
+
+public class ClickhouseSinkOptions {
+
+    /** Bulk size of clickhouse jdbc */
+    public static final Option<Integer> BULK_SIZE =
+            Options.key("bulk_size")
+                    .intType()
+                    .defaultValue(20000)
+                    .withDescription("Bulk size of clickhouse jdbc");
+
+    /** Clickhouse table name */
+    public static final Option<String> TABLE =
+            Options.key("table")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Clickhouse table name");
+
+    /** Split mode when table is distributed engine */
+    public static final Option<Boolean> SPLIT_MODE =
+            Options.key("split_mode")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Split mode when table is distributed 
engine");
+
+    /** When split_mode is true, the sharding_key use for split */
+    public static final Option<String> SHARDING_KEY =
+            Options.key("sharding_key")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("When split_mode is true, the 
sharding_key use for split");
+
+    public static final Option<String> PRIMARY_KEY =
+            Options.key("primary_key")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Mark the primary key column from clickhouse 
table, and based on primary key execute INSERT/UPDATE/DELETE to clickhouse 
table");
+
+    public static final Option<Boolean> SUPPORT_UPSERT =
+            Options.key("support_upsert")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Support upsert row by query primary 
key");
+
+    public static final Option<Boolean> ALLOW_EXPERIMENTAL_LIGHTWEIGHT_DELETE =
+            Options.key("allow_experimental_lightweight_delete")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Allow experimental lightweight delete based on 
`*MergeTree` table engine");
+
+    public static final Option<SchemaSaveMode> SCHEMA_SAVE_MODE =
+            Options.key("schema_save_mode")

Review Comment:
   Please also add this option parameter to the `Factory.optionRule()` method.
   Every parameter in the `Options` class should be included in this method, 
with a description indicating whether it is optional or required.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to