EricJoy2048 commented on code in PR #4405: URL: https://github.com/apache/seatunnel/pull/4405#discussion_r1664062138
########## docs/en/connector-v2/sink/Hudi.md: ########## @@ -0,0 +1,97 @@ +# Hudi + +> Hudi sink connector + +## Description + +Used to write data to Hudi. + +## Key features + +- [x] [exactly-once](../../concept/connector-v2-features.md) + Review Comment: `- [x] [cdc](../../concept/connector-v2-features.md)` ########## docs/en/connector-v2/sink/Hudi.md: ########## @@ -0,0 +1,97 @@ +# Hudi + +> Hudi sink connector + +## Description + +Used to write data to Hudi. + +## Key features + +- [x] [exactly-once](../../concept/connector-v2-features.md) + +## Options + +| name | type | required | default value | +|----------------------------|--------|----------|---------------| +| table_name | string | yes | - | +| table_path | string | yes | - | +| conf_files | string | no | - | +| record_key_fields | string | no | - | +| partition_fields | string | no | - | +| table_type | string | no | copy_on_write | +| op_type | string | no | insert | +| batch_interval_ms | Int | no | 1000 | +| insert_shuffle_parallelism | Int | no | 2 | +| upsert_shuffle_parallelism | Int | no | 2 | +| min_commits_to_keep | Int | no | 20 | +| max_commits_to_keep | Int | no | 30 | +| common-options | config | no | - | + +### table_name [string] + +`table_name` The name of hudi table. + +### table_path [string] Review Comment: `table_dfs_path` is better. `table_path` is same as the `JDBC Connector` and may confusion with `table_path` in other connectors. ########## docs/en/connector-v2/source/Hudi.md: ########## @@ -1,90 +0,0 @@ -# Hudi Review Comment: delete this file? An empty file is meaningless. ########## seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/HudiSinkFactory.java: ########## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.hudi.sink; + +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.connector.TableSink; +import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.api.table.factory.TableSinkFactory; +import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext; + +import com.google.auto.service.AutoService; + +import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiOptions.TABLE_NAME; +import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiOptions.TABLE_PATH; + +@AutoService(Factory.class) +public class HudiSinkFactory implements TableSinkFactory { + @Override + public String factoryIdentifier() { + return "Hudi"; + } + + @Override + public OptionRule optionRule() { + return OptionRule.builder().required(TABLE_PATH, TABLE_NAME).build(); Review Comment: You lost some options here, can you add all options here which you put in the document? ########## seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/HudiSink.java: ########## @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.hudi.sink; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.serialization.DefaultSerializer; +import org.apache.seatunnel.api.serialization.Serializer; +import org.apache.seatunnel.api.sink.SeaTunnelSink; +import org.apache.seatunnel.api.sink.SinkAggregatedCommitter; +import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSinkConfig; +import org.apache.seatunnel.connectors.seatunnel.hudi.sink.committer.HudiSinkAggregatedCommitter; +import org.apache.seatunnel.connectors.seatunnel.hudi.sink.writer.HudiSinkWriter; +import org.apache.seatunnel.connectors.seatunnel.hudi.state.HudiAggregatedCommitInfo; +import org.apache.seatunnel.connectors.seatunnel.hudi.state.HudiCommitInfo; +import org.apache.seatunnel.connectors.seatunnel.hudi.state.HudiSinkState; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +public class HudiSink + implements SeaTunnelSink< + SeaTunnelRow, HudiSinkState, HudiCommitInfo, HudiAggregatedCommitInfo> { Review Comment: Does this connector support `SupportMultiTableSink` ? ########## docs/en/connector-v2/sink/Hudi.md: ########## @@ -0,0 +1,97 @@ +# Hudi + +> Hudi sink connector + +## Description + +Used to write data to Hudi. + +## Key features + +- [x] [exactly-once](../../concept/connector-v2-features.md) + +## Options + +| name | type | required | default value | +|----------------------------|--------|----------|---------------| +| table_name | string | yes | - | +| table_path | string | yes | - | +| conf_files | string | no | - | +| record_key_fields | string | no | - | +| partition_fields | string | no | - | +| table_type | string | no | copy_on_write | +| op_type | string | no | insert | +| batch_interval_ms | Int | no | 1000 | +| insert_shuffle_parallelism | Int | no | 2 | +| upsert_shuffle_parallelism | Int | no | 2 | +| min_commits_to_keep | Int | no | 20 | +| max_commits_to_keep | Int | no | 30 | +| common-options | config | no | - | + +### table_name [string] + +`table_name` The name of hudi table. + +### table_path [string] + +`table.path` The dfs root path of hudi table,such as 'hdfs://nameserivce/data/hudi/hudi_table/'. + +### table_type [string] + +`table_type` The type of hudi table. + +### conf_files [string] + +`conf_files` The environment conf file path list(local path), which used to init hdfs client to read hudi table file. The example is '/home/test/hdfs-site.xml;/home/test/core-site.xml;/home/test/yarn-site.xml'. + +### op_type [boolean] Review Comment: ```suggestion ### op_type [enum] ``` ########## docs/en/connector-v2/sink/Hudi.md: ########## @@ -0,0 +1,97 @@ +# Hudi + +> Hudi sink connector + +## Description + +Used to write data to Hudi. + +## Key features + +- [x] [exactly-once](../../concept/connector-v2-features.md) + +## Options + +| name | type | required | default value | +|----------------------------|--------|----------|---------------| +| table_name | string | yes | - | +| table_path | string | yes | - | +| conf_files | string | no | - | +| record_key_fields | string | no | - | +| partition_fields | string | no | - | +| table_type | string | no | copy_on_write | +| op_type | string | no | insert | +| batch_interval_ms | Int | no | 1000 | +| insert_shuffle_parallelism | Int | no | 2 | +| upsert_shuffle_parallelism | Int | no | 2 | +| min_commits_to_keep | Int | no | 20 | +| max_commits_to_keep | Int | no | 30 | +| common-options | config | no | - | + +### table_name [string] + +`table_name` The name of hudi table. + +### table_path [string] + +`table.path` The dfs root path of hudi table,such as 'hdfs://nameserivce/data/hudi/hudi_table/'. Review Comment: `table.path` ? it is not same as the name in `Options` ########## docs/en/connector-v2/sink/Hudi.md: ########## @@ -0,0 +1,97 @@ +# Hudi + +> Hudi sink connector + +## Description + +Used to write data to Hudi. + +## Key features + +- [x] [exactly-once](../../concept/connector-v2-features.md) + +## Options + +| name | type | required | default value | +|----------------------------|--------|----------|---------------| +| table_name | string | yes | - | +| table_path | string | yes | - | +| conf_files | string | no | - | +| record_key_fields | string | no | - | +| partition_fields | string | no | - | +| table_type | string | no | copy_on_write | +| op_type | string | no | insert | +| batch_interval_ms | Int | no | 1000 | +| insert_shuffle_parallelism | Int | no | 2 | +| upsert_shuffle_parallelism | Int | no | 2 | +| min_commits_to_keep | Int | no | 20 | +| max_commits_to_keep | Int | no | 30 | +| common-options | config | no | - | + +### table_name [string] + +`table_name` The name of hudi table. + +### table_path [string] + +`table.path` The dfs root path of hudi table,such as 'hdfs://nameserivce/data/hudi/hudi_table/'. + +### table_type [string] + +`table_type` The type of hudi table. + +### conf_files [string] Review Comment: `conf_files_path` is better. ########## seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiSinkConfig.java: ########## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.hudi.config; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; + +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.WriteOperationType; + +import lombok.Builder; +import lombok.Data; + +import java.io.Serializable; +import java.util.Optional; + +@Data +@Builder(builderClassName = "Builder") +public class HudiSinkConfig implements Serializable { + + private static final long serialVersionUID = 2L; + + private String tableName; + + private String tablePath; + + private int insertShuffleParallelism; + + private int upsertShuffleParallelism; + + private int minCommitsToKeep; + + private int maxCommitsToKeep; + + private HoodieTableType tableType; + + private WriteOperationType opType; + + private String confFile; + + private int batchIntervalMs; + + private String recordKeyFields; + + private String partitionFields; + + public static HudiSinkConfig of(ReadonlyConfig config) { + HudiSinkConfig.Builder builder = HudiSinkConfig.builder(); + Optional<Integer> optionalInsertShuffleParallelism = + config.getOptional(HudiOptions.INSERT_SHUFFLE_PARALLELISM); + Optional<Integer> optionalUpsertShuffleParallelism = + config.getOptional(HudiOptions.UPSERT_SHUFFLE_PARALLELISM); + Optional<Integer> optionalMinCommitsToKeep = + config.getOptional(HudiOptions.MIN_COMMITS_TO_KEEP); + Optional<Integer> optionalMaxCommitsToKeep = + config.getOptional(HudiOptions.MAX_COMMITS_TO_KEEP); + + Optional<HoodieTableType> optionalTableType = config.getOptional(HudiOptions.TABLE_TYPE); + Optional<WriteOperationType> optionalOpType = config.getOptional(HudiOptions.OP_TYPE); + + Optional<Integer> optionalBatchIntervalMs = + config.getOptional(HudiOptions.BATCH_INTERVAL_MS); + + Optional<String> partitionFields = config.getOptional(HudiOptions.PARTITION_FIELDS); + + Optional<String> recordKeyFields = config.getOptional(HudiOptions.RECORD_KEY_FIELDS); + + builder.confFile(config.get(HudiOptions.CONF_FILES)); + builder.tableName(config.get(HudiOptions.TABLE_NAME)); + builder.tablePath(config.get(HudiOptions.TABLE_PATH)); + builder.tableType(optionalTableType.orElseGet(HudiOptions.TABLE_TYPE::defaultValue)); + builder.opType(optionalOpType.orElseGet(HudiOptions.OP_TYPE::defaultValue)); + + builder.batchIntervalMs( + optionalBatchIntervalMs.orElseGet(HudiOptions.BATCH_INTERVAL_MS::defaultValue)); + + builder.partitionFields( + partitionFields.orElseGet(HudiOptions.PARTITION_FIELDS::defaultValue)); + + builder.recordKeyFields( + recordKeyFields.orElseGet(HudiOptions.RECORD_KEY_FIELDS::defaultValue)); + + builder.insertShuffleParallelism( Review Comment: why not use `builder.insertShuffleParallelism(config.get(HudiOptions.UPSERT_SHUFFLE_PARALLELISM))` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
