This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new db1caee89 [FLINK-38837][docs][pipeline-connector/hudi] Add 
documentation for Hudi connector and improve the configuration
db1caee89 is described below

commit db1caee89e4572ec776373fa8718dcf706415a1e
Author: Shuo Cheng <[email protected]>
AuthorDate: Fri Dec 26 20:46:31 2025 +0800

    [FLINK-38837][docs][pipeline-connector/hudi] Add documentation for Hudi 
connector and improve the configuration
    
     This closes #4200.
---
 .../docs/connectors/pipeline-connectors/hudi.md    | 222 +++++++++++++++++++++
 .../docs/connectors/pipeline-connectors/hudi.md    | 222 +++++++++++++++++++++
 .../flink/cdc/connectors/hudi/sink/HudiConfig.java |  79 +-------
 .../cdc/connectors/hudi/sink/HudiDataSink.java     |  20 +-
 .../connectors/hudi/sink/HudiDataSinkFactory.java  |  18 +-
 .../cdc/connectors/hudi/sink/util/ConfigUtils.java |  14 +-
 .../hudi/sink/HudiDataSinkFactoryTest.java         |   4 +-
 .../cdc/pipeline/tests/MySqlToHudiE2eITCase.java   |   6 +-
 8 files changed, 477 insertions(+), 108 deletions(-)

diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/hudi.md 
b/docs/content.zh/docs/connectors/pipeline-connectors/hudi.md
new file mode 100644
index 000000000..c5e49ae93
--- /dev/null
+++ b/docs/content.zh/docs/connectors/pipeline-connectors/hudi.md
@@ -0,0 +1,222 @@
+---
+title: "Hudi"
+weight: 10
+type: docs
+aliases:
+- /connectors/pipeline-connectors/hudi
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Hudi Pipeline 连接器
+
+Hudi Pipeline 连接器可以用作 Pipeline 的 *Data Sink*,将数据写入[Apache 
Hudi](https://hudi.apache.org)。 本文档介绍如何设置 Hudi Pipeline 连接器。
+
+## 连接器的功能
+* 自动创建 Hudi 表
+* 自动的表结构变更同步
+* 数据实时同步
+
+如何创建 Pipeline
+----------------
+
+从 MySQL 读取数据同步到 Hudi 的 Pipeline 可以定义如下:
+
+```yaml
+source:
+  type: mysql
+  name: MySQL Source
+  hostname: 127.0.0.1
+  port: 3306
+  username: admin
+  password: pass
+  tables: adb.\.*, bdb.user_table_[0-9]+
+  server-id: 5401-5404
+
+sink:
+  type: hudi 
+  name: Hudi Sink
+  path: /path/warehouse
+  hoodie.table.type: MERGE_ON_READ
+
+transform:
+  - source-table: adb.\.*
+    table-options: ordering.fields=ts1
+  - source-table: bdb.user_table_[0-9]+
+    table-options: ordering.fields=ts2
+
+pipeline:
+  name: MySQL to Hudi Pipeline
+  parallelism: 4
+```
+
+Pipeline 连接器配置项
+----------------
+<div class="highlight">
+<table class="colwidths-auto docutils">
+   <thead>
+      <tr>
+        <th class="text-left" style="width: 25%">Option</th>
+        <th class="text-left" style="width: 8%">Required</th>
+        <th class="text-left" style="width: 7%">Default</th>
+        <th class="text-left" style="width: 10%">Type</th>
+        <th class="text-left" style="width: 50%">Description</th>
+      </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td>type</td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>指定要使用的连接器, 这里需要设置成 <code>'hudi'</code></td>
+    </tr>
+    <tr>
+      <td>path</td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>为目标 <code>hudi</code> 表指定基本路径</td>
+    </tr>
+    <tr>
+      <td>name</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Sink 的名称</td>
+    </tr>
+    <tr>
+      <td>hoodie.table.type</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>目标 <code>hudi</code> 表的类型, 目前仅支持 <code>MERGE_ON_READ</code></td>
+    </tr>
+    <tr>
+      <td>index.type</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>目标 <code>hudi</code> 表的索引类型, 目前仅支持 <code>BUCKET</code></td>
+    </tr>
+    <tr>
+      <td>table.properties.*</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>将 <code>hudi</code> 表支持的参数传递给 pipeline,参考 <a 
href="https://hudi.apache.org/docs/configurations/#FLINK_SQL";>Hudi table 
options</a></td>
+    </tr>
+    </tbody>
+</table>    
+</div>
+
+使用说明
+--------
+
+* 源表必须定义主键。用户可通过 <code>transform</code> 规则来自覆盖主键配置,或设置 Ordering Fields 配置。
+* 目前只支持 MERGE_ON_READ 表类型,以及简单 BUCKET 索引。
+* 暂不支持 exactly-once 语义,连接器通过 at-least-once 语义和主键表实现幂等写。
+
+数据类型映射
+----------------
+<div class="wy-table-responsive">
+<table class="colwidths-auto docutils">
+    <thead>
+      <tr>
+        <th class="text-left">CDC type</th>
+        <th class="text-left">Hudi type</th>
+        <th class="text-left" style="width:60%;">NOTE</th>
+      </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td>TINYINT</td>
+      <td>TINYINT</td>
+      <td></td>
+    </tr>
+    <tr>
+      <td>SMALLINT</td>
+      <td>SMALLINT</td>
+      <td></td>
+    </tr>
+    <tr>
+      <td>INT</td>
+      <td>INT</td>
+      <td></td>
+    </tr>
+    <tr>
+      <td>BIGINT</td>
+      <td>BIGINT</td>
+      <td></td>
+    </tr>
+    <tr>
+      <td>FLOAT</td>
+      <td>FLOAT</td>
+      <td></td>
+    </tr>
+    <tr>
+      <td>DOUBLE</td>
+      <td>DOUBLE</td>
+      <td></td>
+    </tr>
+    <tr>
+      <td>DECIMAL(p, s)</td>
+      <td>DECIMAL(p, s)</td>
+      <td></td>
+    </tr>
+    <tr>
+      <td>BOOLEAN</td>
+      <td>BOOLEAN</td>
+      <td></td>
+    </tr>
+    <tr>
+      <td>DATE</td>
+      <td>DATE</td>
+      <td></td>
+    </tr>
+    <tr>
+      <td>TIME</td>
+      <td>TIME</td>
+      <td></td>
+    </tr>
+    <tr>
+      <td>TIMESTAMP</td>
+      <td>TIMESTAMP</td>
+      <td></td>
+    </tr>
+    <tr>
+      <td>BINARY</td>
+      <td>BINARY</td>
+      <td></td>
+    </tr>
+    <tr>
+      <td>CHAR(n)</td>
+      <td>CHAR(n)</td>
+      <td></td>
+    </tr>
+    <tr>
+      <td>VARCHAR(n)</td>
+      <td>VARCHAR(n)</td>
+      <td></td>
+    </tr>
+    </tbody>
+</table>
+</div>
+
+{{< top >}}
diff --git a/docs/content/docs/connectors/pipeline-connectors/hudi.md 
b/docs/content/docs/connectors/pipeline-connectors/hudi.md
new file mode 100644
index 000000000..4de3a683c
--- /dev/null
+++ b/docs/content/docs/connectors/pipeline-connectors/hudi.md
@@ -0,0 +1,222 @@
+---
+title: "Hudi"
+weight: 10
+type: docs
+aliases:
+- /connectors/pipeline-connectors/hudi
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Hudi Pipeline Connector
+
+The Hudi Pipeline Connector functions as a *Data Sink* for data pipelines, 
enabling data writes to [Apache Hudi](https://hudi.apache.org) tables. This 
document explains how to configure the connector.
+
+## What can the connector do?
+* Create table automatically if not exist
+* Schema change synchronization
+* Data synchronization
+
+How to create Pipeline
+----------------
+
+The pipeline for reading data from MySQL and sink to Hudi can be defined as 
follows:
+
+```yaml
+source:
+  type: mysql
+  name: MySQL Source
+  hostname: 127.0.0.1
+  port: 3306
+  username: admin
+  password: pass
+  tables: adb.\.*, bdb.user_table_[0-9]+
+  server-id: 5401-5404
+
+sink:
+  type: hudi 
+  name: Hudi Sink
+  path: /path/warehouse
+  hoodie.table.type: MERGE_ON_READ
+
+transform:
+  - source-table: adb.\.*
+    table-options: ordering.fields=ts1
+  - source-table: bdb.user_table_[0-9]+
+    table-options: ordering.fields=ts2
+
+pipeline:
+  name: MySQL to Hudi Pipeline
+  parallelism: 4
+```
+
+Pipeline Connector Options
+----------------
+<div class="highlight">
+<table class="colwidths-auto docutils">
+   <thead>
+      <tr>
+        <th class="text-left" style="width: 25%">Option</th>
+        <th class="text-left" style="width: 8%">Required</th>
+        <th class="text-left" style="width: 7%">Default</th>
+        <th class="text-left" style="width: 10%">Type</th>
+        <th class="text-left" style="width: 50%">Description</th>
+      </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td>type</td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Specify what connector to use, here should be <code>hudi</code></td>
+    </tr>
+    <tr>
+      <td>path</td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Specify base path for all <code>hudi</code> tables</td>
+    </tr>
+    <tr>
+      <td>name</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The name of the sink</td>
+    </tr>
+    <tr>
+      <td>hoodie.table.type</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Type of table to write, currently only <code>MERGE_ON_READ</code> is 
supported</td>
+    </tr>
+    <tr>
+      <td>index.type</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Index type of Flink write job, currently only <code>BUCKET</code> is 
supported</td>
+    </tr>
+    <tr>
+      <td>table.properties.*</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Pass Hudi table options to the pipeline,See <a 
href="https://hudi.apache.org/docs/configurations/#FLINK_SQL";>Hudi table 
options</a></td>
+    </tr>
+    </tbody>
+</table>    
+</div>
+
+Usage Notes
+--------
+
+* The source table must have a primary key. Users can use 
<code>transform</code> rule to override the primary key configuration or set up 
ordering fields configuration.
+* Only MERGE_ON_READ table with simple BUCKET index is supported now.
+* Exactly-once semantics are not supported. The connector uses at-least-once + 
the table's primary key for idempotent writing.
+
+Data Type Mapping
+----------------
+<div class="wy-table-responsive">
+<table class="colwidths-auto docutils">
+    <thead>
+      <tr>
+        <th class="text-left">CDC type</th>
+        <th class="text-left">Hudi type</th>
+        <th class="text-left" style="width:60%;">NOTE</th>
+      </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td>TINYINT</td>
+      <td>TINYINT</td>
+      <td></td>
+    </tr>
+    <tr>
+      <td>SMALLINT</td>
+      <td>SMALLINT</td>
+      <td></td>
+    </tr>
+    <tr>
+      <td>INT</td>
+      <td>INT</td>
+      <td></td>
+    </tr>
+    <tr>
+      <td>BIGINT</td>
+      <td>BIGINT</td>
+      <td></td>
+    </tr>
+    <tr>
+      <td>FLOAT</td>
+      <td>FLOAT</td>
+      <td></td>
+    </tr>
+    <tr>
+      <td>DOUBLE</td>
+      <td>DOUBLE</td>
+      <td></td>
+    </tr>
+    <tr>
+      <td>DECIMAL(p, s)</td>
+      <td>DECIMAL(p, s)</td>
+      <td></td>
+    </tr>
+    <tr>
+      <td>BOOLEAN</td>
+      <td>BOOLEAN</td>
+      <td></td>
+    </tr>
+    <tr>
+      <td>DATE</td>
+      <td>DATE</td>
+      <td></td>
+    </tr>
+    <tr>
+      <td>TIME</td>
+      <td>TIME</td>
+      <td></td>
+    </tr>
+    <tr>
+      <td>TIMESTAMP</td>
+      <td>TIMESTAMP</td>
+      <td></td>
+    </tr>
+    <tr>
+      <td>BINARY</td>
+      <td>BINARY</td>
+      <td></td>
+    </tr>
+    <tr>
+      <td>CHAR(n)</td>
+      <td>CHAR(n)</td>
+      <td></td>
+    </tr>
+    <tr>
+      <td>VARCHAR(n)</td>
+      <td>VARCHAR(n)</td>
+      <td></td>
+    </tr>
+    </tbody>
+</table>
+</div>
+
+{{< top >}}
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/main/java/org/apache/flink/cdc/connectors/hudi/sink/HudiConfig.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/main/java/org/apache/flink/cdc/connectors/hudi/sink/HudiConfig.java
index 52ad6ed36..b4aa51fe7 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/main/java/org/apache/flink/cdc/connectors/hudi/sink/HudiConfig.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/main/java/org/apache/flink/cdc/connectors/hudi/sink/HudiConfig.java
@@ -21,7 +21,6 @@ import org.apache.flink.cdc.common.configuration.ConfigOption;
 import org.apache.flink.cdc.common.configuration.ConfigOptions;
 import org.apache.flink.configuration.description.Description;
 
-import org.apache.hudi.common.config.HoodieCommonConfig;
 import org.apache.hudi.configuration.FlinkOptions;
 
 /**
@@ -48,10 +47,11 @@ public class HudiConfig {
                 .withDescription(description.toString());
     }
 
-    private static ConfigOption<Integer> intOption(String key, Description 
description) {
+    private static ConfigOption<Integer> intOption(
+            String key, int defaultValue, Description description) {
         return ConfigOptions.key(key)
                 .intType()
-                .noDefaultValue()
+                .defaultValue(defaultValue)
                 .withDescription(description.toString());
     }
 
@@ -75,86 +75,15 @@ public class HudiConfig {
                     FlinkOptions.TABLE_TYPE.defaultValue(),
                     FlinkOptions.TABLE_TYPE.description());
 
-    // Required Fields for CDC
-    public static final ConfigOption<String> RECORD_KEY_FIELD =
-            stringOption(
-                    FlinkOptions.RECORD_KEY_FIELD.key(),
-                    FlinkOptions.RECORD_KEY_FIELD.description());
-
     public static final ConfigOption<String> ORDERING_FIELDS =
             stringOption(
                     FlinkOptions.ORDERING_FIELDS.key(), 
FlinkOptions.ORDERING_FIELDS.description());
 
-    public static final ConfigOption<String> PARTITION_PATH_FIELD =
-            stringOption(
-                    FlinkOptions.PARTITION_PATH_FIELD.key(),
-                    "",
-                    FlinkOptions.PARTITION_PATH_FIELD.description());
-
     // Bucket Index Options
     public static final ConfigOption<String> INDEX_TYPE =
             stringOption(
                     FlinkOptions.INDEX_TYPE.key(), "BUCKET", 
FlinkOptions.INDEX_TYPE.description());
 
-    public static final ConfigOption<String> INDEX_BUCKET_TARGET =
-            stringOption(
-                    FlinkOptions.INDEX_KEY_FIELD.key(), 
FlinkOptions.INDEX_KEY_FIELD.description());
-
-    public static final ConfigOption<Integer> BUCKET_INDEX_NUM_BUCKETS =
-            intOption(
-                    FlinkOptions.BUCKET_INDEX_NUM_BUCKETS.key(),
-                    FlinkOptions.BUCKET_INDEX_NUM_BUCKETS.description());
-
-    // Hive Sync Options
-    public static final ConfigOption<Boolean> HIVE_SYNC_ENABLED =
-            booleanOption(
-                    FlinkOptions.HIVE_SYNC_ENABLED.key(),
-                    false,
-                    FlinkOptions.HIVE_SYNC_ENABLED.description());
-
-    public static final ConfigOption<String> HIVE_SYNC_METASTORE_URIS =
-            stringOption(
-                    FlinkOptions.HIVE_SYNC_METASTORE_URIS.key(),
-                    FlinkOptions.HIVE_SYNC_METASTORE_URIS.description());
-
-    public static final ConfigOption<String> HIVE_SYNC_DB =
-            stringOption(FlinkOptions.HIVE_SYNC_DB.key(), 
FlinkOptions.HIVE_SYNC_DB.description());
-
-    public static final ConfigOption<String> HIVE_SYNC_TABLE =
-            stringOption(
-                    FlinkOptions.HIVE_SYNC_TABLE.key(), 
FlinkOptions.HIVE_SYNC_TABLE.description());
-
-    public static final ConfigOption<String> SCHEMA_OPERATOR_UID =
-            ConfigOptions.key("schema.operator.uid")
-                    .stringType()
-                    .defaultValue("schema-operator-uid")
-                    .withDescription(
-                            "A unique ID for the schema operator, used by the 
BucketAssignerOperator to create a SchemaEvolutionClient.");
-
-    public static final ConfigOption<String> TABLE_SCHEMA =
-            ConfigOptions.key("table.schema")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("The table schema in JSON format for the 
Hudi table.");
-
-    public static final ConfigOption<Integer> BUCKET_ASSIGN_TASKS =
-            intOption(
-                    FlinkOptions.BUCKET_ASSIGN_TASKS.key(),
-                    FlinkOptions.BUCKET_ASSIGN_TASKS.description());
-
     public static final ConfigOption<Integer> WRITE_TASKS =
-            intOption(FlinkOptions.WRITE_TASKS.key(), 
FlinkOptions.WRITE_TASKS.description());
-
-    public static final ConfigOption<Boolean> SCHEMA_ON_READ_ENABLE =
-            booleanOption(
-                    HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key(),
-                    false,
-                    Description.builder().build());
-
-    public static final ConfigOption<Integer> COMPACTION_DELTA_COMMITS =
-            ConfigOptions.key("compaction.delta_commits")
-                    .intType()
-                    .defaultValue(5)
-                    .withDescription(
-                            "Max delta commits needed to trigger compaction, 
default 5 commits");
+            intOption(FlinkOptions.WRITE_TASKS.key(), 4, 
FlinkOptions.WRITE_TASKS.description());
 }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/main/java/org/apache/flink/cdc/connectors/hudi/sink/HudiDataSink.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/main/java/org/apache/flink/cdc/connectors/hudi/sink/HudiDataSink.java
index 400176148..388183f6b 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/main/java/org/apache/flink/cdc/connectors/hudi/sink/HudiDataSink.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/main/java/org/apache/flink/cdc/connectors/hudi/sink/HudiDataSink.java
@@ -30,6 +30,10 @@ import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
 import java.time.ZoneId;
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.flink.cdc.connectors.hudi.sink.HudiDataSinkOptions.PREFIX_TABLE_PROPERTIES;
 
 /**
  * A {@link DataSink} for Apache Hudi that provides the main entry point for 
the Flink CDC
@@ -45,7 +49,7 @@ public class HudiDataSink implements DataSink, Serializable {
 
     public HudiDataSink(Configuration config, String schemaOperatorUid) {
         LOG.info("Creating HudiDataSink with universal configuration {}", 
config);
-        this.config = config;
+        this.config = normalizeOptions(config);
         this.schemaOperatorUid = schemaOperatorUid;
     }
 
@@ -94,4 +98,18 @@ public class HudiDataSink implements DataSink, Serializable {
         
flinkConfig.setString(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key(), "true");
         return flinkConfig;
     }
+
+    private static Configuration normalizeOptions(Configuration config) {
+        Map<String, String> options = new HashMap<>();
+        config.toMap()
+                .forEach(
+                        (key, val) -> {
+                            if (key.startsWith(PREFIX_TABLE_PROPERTIES)) {
+                                
options.put(key.substring(PREFIX_TABLE_PROPERTIES.length()), val);
+                            } else {
+                                options.put(key, val);
+                            }
+                        });
+        return Configuration.fromMap(options);
+    }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/main/java/org/apache/flink/cdc/connectors/hudi/sink/HudiDataSinkFactory.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/main/java/org/apache/flink/cdc/connectors/hudi/sink/HudiDataSinkFactory.java
index f5d22d228..fbc61d724 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/main/java/org/apache/flink/cdc/connectors/hudi/sink/HudiDataSinkFactory.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/main/java/org/apache/flink/cdc/connectors/hudi/sink/HudiDataSinkFactory.java
@@ -23,6 +23,7 @@ import org.apache.flink.cdc.common.factories.DataSinkFactory;
 import org.apache.flink.cdc.common.factories.FactoryHelper;
 import org.apache.flink.cdc.common.pipeline.PipelineOptions;
 
+import org.apache.hudi.index.HoodieIndex;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -59,7 +60,7 @@ public class HudiDataSinkFactory implements DataSinkFactory {
 
         // Validate that only BUCKET index type is used
         String indexType = config.get(HudiConfig.INDEX_TYPE);
-        if (indexType != null && !indexType.equalsIgnoreCase("BUCKET")) {
+        if (indexType != null && 
!indexType.equalsIgnoreCase(HoodieIndex.IndexType.BUCKET.name())) {
             throw new IllegalArgumentException(
                     String.format(
                             "Unsupported index type '%s'. Currently only 
'BUCKET' index type is supported. "
@@ -67,6 +68,7 @@ public class HudiDataSinkFactory implements DataSinkFactory {
                                     + "for multi-table CDC pipelines.",
                             indexType));
         }
+        config.set(HudiConfig.INDEX_TYPE, HoodieIndex.IndexType.BUCKET.name());
 
         String schemaOperatorUid =
                 context.getPipelineConfiguration()
@@ -85,22 +87,8 @@ public class HudiDataSinkFactory implements DataSinkFactory {
     @Override
     public Set<ConfigOption<?>> optionalOptions() {
         Set<ConfigOption<?>> options = new HashSet<>();
-        options.add(HudiConfig.RECORD_KEY_FIELD);
         options.add(HudiConfig.TABLE_TYPE);
-        options.add(HudiConfig.PARTITION_PATH_FIELD);
         options.add(HudiConfig.INDEX_TYPE);
-        options.add(HudiConfig.INDEX_BUCKET_TARGET);
-        options.add(HudiConfig.HIVE_SYNC_ENABLED);
-        options.add(HudiConfig.HIVE_SYNC_METASTORE_URIS);
-        options.add(HudiConfig.HIVE_SYNC_DB);
-        options.add(HudiConfig.HIVE_SYNC_TABLE);
-
-        options.add(HudiConfig.WRITE_TASKS);
-        options.add(HudiConfig.BUCKET_ASSIGN_TASKS);
-        options.add(HudiConfig.SCHEMA_ON_READ_ENABLE);
-
-        // Compaction settings
-        options.add(HudiConfig.COMPACTION_DELTA_COMMITS);
         return options;
     }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/main/java/org/apache/flink/cdc/connectors/hudi/sink/util/ConfigUtils.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/main/java/org/apache/flink/cdc/connectors/hudi/sink/util/ConfigUtils.java
index 4c2ce2fec..fe0b37f20 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/main/java/org/apache/flink/cdc/connectors/hudi/sink/util/ConfigUtils.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/main/java/org/apache/flink/cdc/connectors/hudi/sink/util/ConfigUtils.java
@@ -22,7 +22,6 @@ import org.apache.flink.cdc.common.schema.Schema;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.types.logical.RowType;
 
-import org.apache.hudi.common.util.Option;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.exception.HoodieValidationException;
 import org.apache.hudi.index.HoodieIndex;
@@ -75,10 +74,9 @@ public class ConfigUtils {
             conf.set(FlinkOptions.PARTITION_PATH_FIELD, String.join(",", 
partitionKeys));
         }
 
-        Option<String> orderingFieldsOpt =
-                
Option.ofNullable(schema.options().get(FlinkOptions.ORDERING_FIELDS.key()));
-        orderingFieldsOpt.ifPresent(
-                orderingFields -> conf.set(FlinkOptions.ORDERING_FIELDS, 
orderingFields));
+        for (Map.Entry<String, String> kv : schema.options().entrySet()) {
+            conf.setString(kv.getKey(), kv.getValue());
+        }
 
         if 
(conf.get(FlinkOptions.INDEX_TYPE).equals(HoodieIndex.IndexType.BUCKET.name())) 
{
             if (conf.get(FlinkOptions.INDEX_KEY_FIELD).isEmpty()) {
@@ -120,10 +118,6 @@ public class ConfigUtils {
         }
 
         // setup ordering fields from schema options
-        Option<String> orderingFieldsOpt =
-                
Option.ofNullable(schema.options().get(FlinkOptions.ORDERING_FIELDS.key()));
-        orderingFieldsOpt.ifPresent(
-                orderingFields ->
-                        tableOptions.put(FlinkOptions.ORDERING_FIELDS.key(), 
orderingFields));
+        tableOptions.putAll(schema.options());
     }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/test/java/org/apache/flink/cdc/connectors/hudi/sink/HudiDataSinkFactoryTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/test/java/org/apache/flink/cdc/connectors/hudi/sink/HudiDataSinkFactoryTest.java
index ddc2e14c5..d6c141626 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/test/java/org/apache/flink/cdc/connectors/hudi/sink/HudiDataSinkFactoryTest.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/test/java/org/apache/flink/cdc/connectors/hudi/sink/HudiDataSinkFactoryTest.java
@@ -105,7 +105,7 @@ class HudiDataSinkFactoryTest {
                 Configuration.fromMap(
                         ImmutableMap.<String, String>builder()
                                 .put(HudiConfig.PATH.key(), 
temporaryFolder.toString())
-                                .put(HudiConfig.RECORD_KEY_FIELD.key(), "id")
+                                .put(HudiConfig.TABLE_TYPE.key(), 
"MERGE_ON_READ")
                                 .put("unsupported_key", "unsupported_value")
                                 .build());
 
@@ -132,7 +132,7 @@ class HudiDataSinkFactoryTest {
                 Configuration.fromMap(
                         ImmutableMap.<String, String>builder()
                                 .put(HudiConfig.PATH.key(), 
temporaryFolder.toString())
-                                .put(HudiConfig.RECORD_KEY_FIELD.key(), "id")
+                                .put(HudiConfig.TABLE_TYPE.key(), 
"MERGE_ON_READ")
                                 .build());
 
         DataSink dataSink =
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToHudiE2eITCase.java
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToHudiE2eITCase.java
index cf8cc17ee..76936b698 100644
--- 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToHudiE2eITCase.java
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToHudiE2eITCase.java
@@ -223,9 +223,7 @@ public class MySqlToHudiE2eITCase extends 
PipelineTestEnvironment {
                                 + "  hoodie.table.type: "
                                 + TABLE_TYPE
                                 + " \n"
-                                + "  write.bucket_assign.tasks: 2\n"
-                                + "  write.tasks: 2\n"
-                                + "  compaction.delta_commits: 2\n"
+                                + "  
table.properties.compaction.delta_commits: 2\n"
                                 + "\n"
                                 + "pipeline:\n"
                                 + "  schema.change.behavior: evolve\n"
@@ -561,8 +559,6 @@ public class MySqlToHudiE2eITCase extends 
PipelineTestEnvironment {
                                 + "  type: hudi\n"
                                 + "  path: %s\n"
                                 + "  hoodie.table.type: MERGE_ON_READ\n"
-                                + "  write.bucket_assign.tasks: 2\n"
-                                + "  write.tasks: 2\n"
                                 + "\n"
                                 + "pipeline:\n"
                                 + "  schema.change.behavior: evolve\n"

Reply via email to