This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new db1caee89 [FLINK-38837][docs][pipeline-connector/hudi] Add
documentation for Hudi connector and improve the configuration
db1caee89 is described below
commit db1caee89e4572ec776373fa8718dcf706415a1e
Author: Shuo Cheng <[email protected]>
AuthorDate: Fri Dec 26 20:46:31 2025 +0800
[FLINK-38837][docs][pipeline-connector/hudi] Add documentation for Hudi
connector and improve the configuration
This closes #4200.
---
.../docs/connectors/pipeline-connectors/hudi.md | 222 +++++++++++++++++++++
.../docs/connectors/pipeline-connectors/hudi.md | 222 +++++++++++++++++++++
.../flink/cdc/connectors/hudi/sink/HudiConfig.java | 79 +-------
.../cdc/connectors/hudi/sink/HudiDataSink.java | 20 +-
.../connectors/hudi/sink/HudiDataSinkFactory.java | 18 +-
.../cdc/connectors/hudi/sink/util/ConfigUtils.java | 14 +-
.../hudi/sink/HudiDataSinkFactoryTest.java | 4 +-
.../cdc/pipeline/tests/MySqlToHudiE2eITCase.java | 6 +-
8 files changed, 477 insertions(+), 108 deletions(-)
diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/hudi.md
b/docs/content.zh/docs/connectors/pipeline-connectors/hudi.md
new file mode 100644
index 000000000..c5e49ae93
--- /dev/null
+++ b/docs/content.zh/docs/connectors/pipeline-connectors/hudi.md
@@ -0,0 +1,222 @@
+---
+title: "Hudi"
+weight: 10
+type: docs
+aliases:
+- /connectors/pipeline-connectors/hudi
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Hudi Pipeline 连接器
+
+Hudi Pipeline 连接器可以用作 Pipeline 的 *Data Sink*,将数据写入[Apache
Hudi](https://hudi.apache.org)。 本文档介绍如何设置 Hudi Pipeline 连接器。
+
+## 连接器的功能
+* 自动创建 Hudi 表
+* 自动的表结构变更同步
+* 数据实时同步
+
+如何创建 Pipeline
+----------------
+
+从 MySQL 读取数据同步到 Hudi 的 Pipeline 可以定义如下:
+
+```yaml
+source:
+ type: mysql
+ name: MySQL Source
+ hostname: 127.0.0.1
+ port: 3306
+ username: admin
+ password: pass
+ tables: adb.\.*, bdb.user_table_[0-9]+
+ server-id: 5401-5404
+
+sink:
+ type: hudi
+ name: Hudi Sink
+ path: /path/warehouse
+ hoodie.table.type: MERGE_ON_READ
+
+transform:
+ - source-table: adb.\.*
+ table-options: ordering.fields=ts1
+ - source-table: bdb.user_table_[0-9]+
+ table-options: ordering.fields=ts2
+
+pipeline:
+ name: MySQL to Hudi Pipeline
+ parallelism: 4
+```
+
+Pipeline 连接器配置项
+----------------
+<div class="highlight">
+<table class="colwidths-auto docutils">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 25%">Option</th>
+ <th class="text-left" style="width: 8%">Required</th>
+ <th class="text-left" style="width: 7%">Default</th>
+ <th class="text-left" style="width: 10%">Type</th>
+ <th class="text-left" style="width: 50%">Description</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td>type</td>
+ <td>required</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>指定要使用的连接器, 这里需要设置成 <code>'hudi'</code></td>
+ </tr>
+ <tr>
+ <td>path</td>
+ <td>required</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>为目标 <code>hudi</code> 表指定基本路径</td>
+ </tr>
+ <tr>
+ <td>name</td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Sink 的名称</td>
+ </tr>
+ <tr>
+ <td>hoodie.table.type</td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>目标 <code>hudi</code> 表的类型, 目前仅支持 <code>MERGE_ON_READ</code></td>
+ </tr>
+ <tr>
+ <td>index.type</td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>目标 <code>hudi</code> 表的索引类型, 目前仅支持 <code>BUCKET</code></td>
+ </tr>
+ <tr>
+ <td>table.properties.*</td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>将 <code>hudi</code> 表支持的参数传递给 pipeline,参考 <a
href="https://hudi.apache.org/docs/configurations/#FLINK_SQL">Hudi table
options</a></td>
+ </tr>
+ </tbody>
+</table>
+</div>
+
+使用说明
+--------
+
+* 源表必须定义主键。用户可通过 <code>transform</code> 规则来自覆盖主键配置,或设置 Ordering Fields 配置。
+* 目前只支持 MERGE_ON_READ 表类型,以及简单 BUCKET 索引。
+* 暂不支持 exactly-once 语义,连接器通过 at-least-once 语义和主键表实现幂等写。
+
+数据类型映射
+----------------
+<div class="wy-table-responsive">
+<table class="colwidths-auto docutils">
+ <thead>
+ <tr>
+ <th class="text-left">CDC type</th>
+ <th class="text-left">Hudi type</th>
+ <th class="text-left" style="width:60%;">NOTE</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td>TINYINT</td>
+ <td>TINYINT</td>
+ <td></td>
+ </tr>
+ <tr>
+ <td>SMALLINT</td>
+ <td>SMALLINT</td>
+ <td></td>
+ </tr>
+ <tr>
+ <td>INT</td>
+ <td>INT</td>
+ <td></td>
+ </tr>
+ <tr>
+ <td>BIGINT</td>
+ <td>BIGINT</td>
+ <td></td>
+ </tr>
+ <tr>
+ <td>FLOAT</td>
+ <td>FLOAT</td>
+ <td></td>
+ </tr>
+ <tr>
+ <td>DOUBLE</td>
+ <td>DOUBLE</td>
+ <td></td>
+ </tr>
+ <tr>
+ <td>DECIMAL(p, s)</td>
+ <td>DECIMAL(p, s)</td>
+ <td></td>
+ </tr>
+ <tr>
+ <td>BOOLEAN</td>
+ <td>BOOLEAN</td>
+ <td></td>
+ </tr>
+ <tr>
+ <td>DATE</td>
+ <td>DATE</td>
+ <td></td>
+ </tr>
+ <tr>
+ <td>TIME</td>
+ <td>TIME</td>
+ <td></td>
+ </tr>
+ <tr>
+ <td>TIMESTAMP</td>
+ <td>TIMESTAMP</td>
+ <td></td>
+ </tr>
+ <tr>
+ <td>BINARY</td>
+ <td>BINARY</td>
+ <td></td>
+ </tr>
+ <tr>
+ <td>CHAR(n)</td>
+ <td>CHAR(n)</td>
+ <td></td>
+ </tr>
+ <tr>
+ <td>VARCHAR(n)</td>
+ <td>VARCHAR(n)</td>
+ <td></td>
+ </tr>
+ </tbody>
+</table>
+</div>
+
+{{< top >}}
diff --git a/docs/content/docs/connectors/pipeline-connectors/hudi.md
b/docs/content/docs/connectors/pipeline-connectors/hudi.md
new file mode 100644
index 000000000..4de3a683c
--- /dev/null
+++ b/docs/content/docs/connectors/pipeline-connectors/hudi.md
@@ -0,0 +1,222 @@
+---
+title: "Hudi"
+weight: 10
+type: docs
+aliases:
+- /connectors/pipeline-connectors/hudi
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Hudi Pipeline Connector
+
+The Hudi Pipeline Connector functions as a *Data Sink* for data pipelines,
enabling data writes to [Apache Hudi](https://hudi.apache.org) tables. This
document explains how to configure the connector.
+
+## What can the connector do?
+* Create table automatically if not exist
+* Schema change synchronization
+* Data synchronization
+
+How to create Pipeline
+----------------
+
+The pipeline for reading data from MySQL and sink to Hudi can be defined as
follows:
+
+```yaml
+source:
+ type: mysql
+ name: MySQL Source
+ hostname: 127.0.0.1
+ port: 3306
+ username: admin
+ password: pass
+ tables: adb.\.*, bdb.user_table_[0-9]+
+ server-id: 5401-5404
+
+sink:
+ type: hudi
+ name: Hudi Sink
+ path: /path/warehouse
+ hoodie.table.type: MERGE_ON_READ
+
+transform:
+ - source-table: adb.\.*
+ table-options: ordering.fields=ts1
+ - source-table: bdb.user_table_[0-9]+
+ table-options: ordering.fields=ts2
+
+pipeline:
+ name: MySQL to Hudi Pipeline
+ parallelism: 4
+```
+
+Pipeline Connector Options
+----------------
+<div class="highlight">
+<table class="colwidths-auto docutils">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 25%">Option</th>
+ <th class="text-left" style="width: 8%">Required</th>
+ <th class="text-left" style="width: 7%">Default</th>
+ <th class="text-left" style="width: 10%">Type</th>
+ <th class="text-left" style="width: 50%">Description</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td>type</td>
+ <td>required</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Specify what connector to use, here should be <code>hudi</code></td>
+ </tr>
+ <tr>
+ <td>path</td>
+ <td>required</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Specify base path for all <code>hudi</code> tables</td>
+ </tr>
+ <tr>
+ <td>name</td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The name of the sink</td>
+ </tr>
+ <tr>
+ <td>hoodie.table.type</td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Type of table to write, currently only <code>MERGE_ON_READ</code> is
supported</td>
+ </tr>
+ <tr>
+ <td>index.type</td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Index type of Flink write job, currently only <code>BUCKET</code> is
supported</td>
+ </tr>
+ <tr>
+ <td>table.properties.*</td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Pass Hudi table options to the pipeline,See <a
href="https://hudi.apache.org/docs/configurations/#FLINK_SQL">Hudi table
options</a></td>
+ </tr>
+ </tbody>
+</table>
+</div>
+
+Usage Notes
+--------
+
+* The source table must have a primary key. Users can use
<code>transform</code> rule to override the primary key configuration or set up
ordering fields configuration.
+* Only MERGE_ON_READ table with simple BUCKET index is supported now.
+* Exactly-once semantics are not supported. The connector uses at-least-once +
the table's primary key for idempotent writing.
+
+Data Type Mapping
+----------------
+<div class="wy-table-responsive">
+<table class="colwidths-auto docutils">
+ <thead>
+ <tr>
+ <th class="text-left">CDC type</th>
+ <th class="text-left">Hudi type</th>
+ <th class="text-left" style="width:60%;">NOTE</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td>TINYINT</td>
+ <td>TINYINT</td>
+ <td></td>
+ </tr>
+ <tr>
+ <td>SMALLINT</td>
+ <td>SMALLINT</td>
+ <td></td>
+ </tr>
+ <tr>
+ <td>INT</td>
+ <td>INT</td>
+ <td></td>
+ </tr>
+ <tr>
+ <td>BIGINT</td>
+ <td>BIGINT</td>
+ <td></td>
+ </tr>
+ <tr>
+ <td>FLOAT</td>
+ <td>FLOAT</td>
+ <td></td>
+ </tr>
+ <tr>
+ <td>DOUBLE</td>
+ <td>DOUBLE</td>
+ <td></td>
+ </tr>
+ <tr>
+ <td>DECIMAL(p, s)</td>
+ <td>DECIMAL(p, s)</td>
+ <td></td>
+ </tr>
+ <tr>
+ <td>BOOLEAN</td>
+ <td>BOOLEAN</td>
+ <td></td>
+ </tr>
+ <tr>
+ <td>DATE</td>
+ <td>DATE</td>
+ <td></td>
+ </tr>
+ <tr>
+ <td>TIME</td>
+ <td>TIME</td>
+ <td></td>
+ </tr>
+ <tr>
+ <td>TIMESTAMP</td>
+ <td>TIMESTAMP</td>
+ <td></td>
+ </tr>
+ <tr>
+ <td>BINARY</td>
+ <td>BINARY</td>
+ <td></td>
+ </tr>
+ <tr>
+ <td>CHAR(n)</td>
+ <td>CHAR(n)</td>
+ <td></td>
+ </tr>
+ <tr>
+ <td>VARCHAR(n)</td>
+ <td>VARCHAR(n)</td>
+ <td></td>
+ </tr>
+ </tbody>
+</table>
+</div>
+
+{{< top >}}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/main/java/org/apache/flink/cdc/connectors/hudi/sink/HudiConfig.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/main/java/org/apache/flink/cdc/connectors/hudi/sink/HudiConfig.java
index 52ad6ed36..b4aa51fe7 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/main/java/org/apache/flink/cdc/connectors/hudi/sink/HudiConfig.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/main/java/org/apache/flink/cdc/connectors/hudi/sink/HudiConfig.java
@@ -21,7 +21,6 @@ import org.apache.flink.cdc.common.configuration.ConfigOption;
import org.apache.flink.cdc.common.configuration.ConfigOptions;
import org.apache.flink.configuration.description.Description;
-import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.configuration.FlinkOptions;
/**
@@ -48,10 +47,11 @@ public class HudiConfig {
.withDescription(description.toString());
}
- private static ConfigOption<Integer> intOption(String key, Description
description) {
+ private static ConfigOption<Integer> intOption(
+ String key, int defaultValue, Description description) {
return ConfigOptions.key(key)
.intType()
- .noDefaultValue()
+ .defaultValue(defaultValue)
.withDescription(description.toString());
}
@@ -75,86 +75,15 @@ public class HudiConfig {
FlinkOptions.TABLE_TYPE.defaultValue(),
FlinkOptions.TABLE_TYPE.description());
- // Required Fields for CDC
- public static final ConfigOption<String> RECORD_KEY_FIELD =
- stringOption(
- FlinkOptions.RECORD_KEY_FIELD.key(),
- FlinkOptions.RECORD_KEY_FIELD.description());
-
public static final ConfigOption<String> ORDERING_FIELDS =
stringOption(
FlinkOptions.ORDERING_FIELDS.key(),
FlinkOptions.ORDERING_FIELDS.description());
- public static final ConfigOption<String> PARTITION_PATH_FIELD =
- stringOption(
- FlinkOptions.PARTITION_PATH_FIELD.key(),
- "",
- FlinkOptions.PARTITION_PATH_FIELD.description());
-
// Bucket Index Options
public static final ConfigOption<String> INDEX_TYPE =
stringOption(
FlinkOptions.INDEX_TYPE.key(), "BUCKET",
FlinkOptions.INDEX_TYPE.description());
- public static final ConfigOption<String> INDEX_BUCKET_TARGET =
- stringOption(
- FlinkOptions.INDEX_KEY_FIELD.key(),
FlinkOptions.INDEX_KEY_FIELD.description());
-
- public static final ConfigOption<Integer> BUCKET_INDEX_NUM_BUCKETS =
- intOption(
- FlinkOptions.BUCKET_INDEX_NUM_BUCKETS.key(),
- FlinkOptions.BUCKET_INDEX_NUM_BUCKETS.description());
-
- // Hive Sync Options
- public static final ConfigOption<Boolean> HIVE_SYNC_ENABLED =
- booleanOption(
- FlinkOptions.HIVE_SYNC_ENABLED.key(),
- false,
- FlinkOptions.HIVE_SYNC_ENABLED.description());
-
- public static final ConfigOption<String> HIVE_SYNC_METASTORE_URIS =
- stringOption(
- FlinkOptions.HIVE_SYNC_METASTORE_URIS.key(),
- FlinkOptions.HIVE_SYNC_METASTORE_URIS.description());
-
- public static final ConfigOption<String> HIVE_SYNC_DB =
- stringOption(FlinkOptions.HIVE_SYNC_DB.key(),
FlinkOptions.HIVE_SYNC_DB.description());
-
- public static final ConfigOption<String> HIVE_SYNC_TABLE =
- stringOption(
- FlinkOptions.HIVE_SYNC_TABLE.key(),
FlinkOptions.HIVE_SYNC_TABLE.description());
-
- public static final ConfigOption<String> SCHEMA_OPERATOR_UID =
- ConfigOptions.key("schema.operator.uid")
- .stringType()
- .defaultValue("schema-operator-uid")
- .withDescription(
- "A unique ID for the schema operator, used by the
BucketAssignerOperator to create a SchemaEvolutionClient.");
-
- public static final ConfigOption<String> TABLE_SCHEMA =
- ConfigOptions.key("table.schema")
- .stringType()
- .noDefaultValue()
- .withDescription("The table schema in JSON format for the
Hudi table.");
-
- public static final ConfigOption<Integer> BUCKET_ASSIGN_TASKS =
- intOption(
- FlinkOptions.BUCKET_ASSIGN_TASKS.key(),
- FlinkOptions.BUCKET_ASSIGN_TASKS.description());
-
public static final ConfigOption<Integer> WRITE_TASKS =
- intOption(FlinkOptions.WRITE_TASKS.key(),
FlinkOptions.WRITE_TASKS.description());
-
- public static final ConfigOption<Boolean> SCHEMA_ON_READ_ENABLE =
- booleanOption(
- HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key(),
- false,
- Description.builder().build());
-
- public static final ConfigOption<Integer> COMPACTION_DELTA_COMMITS =
- ConfigOptions.key("compaction.delta_commits")
- .intType()
- .defaultValue(5)
- .withDescription(
- "Max delta commits needed to trigger compaction,
default 5 commits");
+ intOption(FlinkOptions.WRITE_TASKS.key(), 4,
FlinkOptions.WRITE_TASKS.description());
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/main/java/org/apache/flink/cdc/connectors/hudi/sink/HudiDataSink.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/main/java/org/apache/flink/cdc/connectors/hudi/sink/HudiDataSink.java
index 400176148..388183f6b 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/main/java/org/apache/flink/cdc/connectors/hudi/sink/HudiDataSink.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/main/java/org/apache/flink/cdc/connectors/hudi/sink/HudiDataSink.java
@@ -30,6 +30,10 @@ import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.time.ZoneId;
+import java.util.HashMap;
+import java.util.Map;
+
+import static
org.apache.flink.cdc.connectors.hudi.sink.HudiDataSinkOptions.PREFIX_TABLE_PROPERTIES;
/**
* A {@link DataSink} for Apache Hudi that provides the main entry point for
the Flink CDC
@@ -45,7 +49,7 @@ public class HudiDataSink implements DataSink, Serializable {
public HudiDataSink(Configuration config, String schemaOperatorUid) {
LOG.info("Creating HudiDataSink with universal configuration {}",
config);
- this.config = config;
+ this.config = normalizeOptions(config);
this.schemaOperatorUid = schemaOperatorUid;
}
@@ -94,4 +98,18 @@ public class HudiDataSink implements DataSink, Serializable {
flinkConfig.setString(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key(), "true");
return flinkConfig;
}
+
+ private static Configuration normalizeOptions(Configuration config) {
+ Map<String, String> options = new HashMap<>();
+ config.toMap()
+ .forEach(
+ (key, val) -> {
+ if (key.startsWith(PREFIX_TABLE_PROPERTIES)) {
+
options.put(key.substring(PREFIX_TABLE_PROPERTIES.length()), val);
+ } else {
+ options.put(key, val);
+ }
+ });
+ return Configuration.fromMap(options);
+ }
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/main/java/org/apache/flink/cdc/connectors/hudi/sink/HudiDataSinkFactory.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/main/java/org/apache/flink/cdc/connectors/hudi/sink/HudiDataSinkFactory.java
index f5d22d228..fbc61d724 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/main/java/org/apache/flink/cdc/connectors/hudi/sink/HudiDataSinkFactory.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/main/java/org/apache/flink/cdc/connectors/hudi/sink/HudiDataSinkFactory.java
@@ -23,6 +23,7 @@ import org.apache.flink.cdc.common.factories.DataSinkFactory;
import org.apache.flink.cdc.common.factories.FactoryHelper;
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
+import org.apache.hudi.index.HoodieIndex;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -59,7 +60,7 @@ public class HudiDataSinkFactory implements DataSinkFactory {
// Validate that only BUCKET index type is used
String indexType = config.get(HudiConfig.INDEX_TYPE);
- if (indexType != null && !indexType.equalsIgnoreCase("BUCKET")) {
+ if (indexType != null &&
!indexType.equalsIgnoreCase(HoodieIndex.IndexType.BUCKET.name())) {
throw new IllegalArgumentException(
String.format(
"Unsupported index type '%s'. Currently only
'BUCKET' index type is supported. "
@@ -67,6 +68,7 @@ public class HudiDataSinkFactory implements DataSinkFactory {
+ "for multi-table CDC pipelines.",
indexType));
}
+ config.set(HudiConfig.INDEX_TYPE, HoodieIndex.IndexType.BUCKET.name());
String schemaOperatorUid =
context.getPipelineConfiguration()
@@ -85,22 +87,8 @@ public class HudiDataSinkFactory implements DataSinkFactory {
@Override
public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
- options.add(HudiConfig.RECORD_KEY_FIELD);
options.add(HudiConfig.TABLE_TYPE);
- options.add(HudiConfig.PARTITION_PATH_FIELD);
options.add(HudiConfig.INDEX_TYPE);
- options.add(HudiConfig.INDEX_BUCKET_TARGET);
- options.add(HudiConfig.HIVE_SYNC_ENABLED);
- options.add(HudiConfig.HIVE_SYNC_METASTORE_URIS);
- options.add(HudiConfig.HIVE_SYNC_DB);
- options.add(HudiConfig.HIVE_SYNC_TABLE);
-
- options.add(HudiConfig.WRITE_TASKS);
- options.add(HudiConfig.BUCKET_ASSIGN_TASKS);
- options.add(HudiConfig.SCHEMA_ON_READ_ENABLE);
-
- // Compaction settings
- options.add(HudiConfig.COMPACTION_DELTA_COMMITS);
return options;
}
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/main/java/org/apache/flink/cdc/connectors/hudi/sink/util/ConfigUtils.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/main/java/org/apache/flink/cdc/connectors/hudi/sink/util/ConfigUtils.java
index 4c2ce2fec..fe0b37f20 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/main/java/org/apache/flink/cdc/connectors/hudi/sink/util/ConfigUtils.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/main/java/org/apache/flink/cdc/connectors/hudi/sink/util/ConfigUtils.java
@@ -22,7 +22,6 @@ import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.types.logical.RowType;
-import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.index.HoodieIndex;
@@ -75,10 +74,9 @@ public class ConfigUtils {
conf.set(FlinkOptions.PARTITION_PATH_FIELD, String.join(",",
partitionKeys));
}
- Option<String> orderingFieldsOpt =
-
Option.ofNullable(schema.options().get(FlinkOptions.ORDERING_FIELDS.key()));
- orderingFieldsOpt.ifPresent(
- orderingFields -> conf.set(FlinkOptions.ORDERING_FIELDS,
orderingFields));
+ for (Map.Entry<String, String> kv : schema.options().entrySet()) {
+ conf.setString(kv.getKey(), kv.getValue());
+ }
if
(conf.get(FlinkOptions.INDEX_TYPE).equals(HoodieIndex.IndexType.BUCKET.name()))
{
if (conf.get(FlinkOptions.INDEX_KEY_FIELD).isEmpty()) {
@@ -120,10 +118,6 @@ public class ConfigUtils {
}
// setup ordering fields from schema options
- Option<String> orderingFieldsOpt =
-
Option.ofNullable(schema.options().get(FlinkOptions.ORDERING_FIELDS.key()));
- orderingFieldsOpt.ifPresent(
- orderingFields ->
- tableOptions.put(FlinkOptions.ORDERING_FIELDS.key(),
orderingFields));
+ tableOptions.putAll(schema.options());
}
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/test/java/org/apache/flink/cdc/connectors/hudi/sink/HudiDataSinkFactoryTest.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/test/java/org/apache/flink/cdc/connectors/hudi/sink/HudiDataSinkFactoryTest.java
index ddc2e14c5..d6c141626 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/test/java/org/apache/flink/cdc/connectors/hudi/sink/HudiDataSinkFactoryTest.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/test/java/org/apache/flink/cdc/connectors/hudi/sink/HudiDataSinkFactoryTest.java
@@ -105,7 +105,7 @@ class HudiDataSinkFactoryTest {
Configuration.fromMap(
ImmutableMap.<String, String>builder()
.put(HudiConfig.PATH.key(),
temporaryFolder.toString())
- .put(HudiConfig.RECORD_KEY_FIELD.key(), "id")
+ .put(HudiConfig.TABLE_TYPE.key(),
"MERGE_ON_READ")
.put("unsupported_key", "unsupported_value")
.build());
@@ -132,7 +132,7 @@ class HudiDataSinkFactoryTest {
Configuration.fromMap(
ImmutableMap.<String, String>builder()
.put(HudiConfig.PATH.key(),
temporaryFolder.toString())
- .put(HudiConfig.RECORD_KEY_FIELD.key(), "id")
+ .put(HudiConfig.TABLE_TYPE.key(),
"MERGE_ON_READ")
.build());
DataSink dataSink =
diff --git
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToHudiE2eITCase.java
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToHudiE2eITCase.java
index cf8cc17ee..76936b698 100644
---
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToHudiE2eITCase.java
+++
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToHudiE2eITCase.java
@@ -223,9 +223,7 @@ public class MySqlToHudiE2eITCase extends
PipelineTestEnvironment {
+ " hoodie.table.type: "
+ TABLE_TYPE
+ " \n"
- + " write.bucket_assign.tasks: 2\n"
- + " write.tasks: 2\n"
- + " compaction.delta_commits: 2\n"
+ + "
table.properties.compaction.delta_commits: 2\n"
+ "\n"
+ "pipeline:\n"
+ " schema.change.behavior: evolve\n"
@@ -561,8 +559,6 @@ public class MySqlToHudiE2eITCase extends
PipelineTestEnvironment {
+ " type: hudi\n"
+ " path: %s\n"
+ " hoodie.table.type: MERGE_ON_READ\n"
- + " write.bucket_assign.tasks: 2\n"
- + " write.tasks: 2\n"
+ "\n"
+ "pipeline:\n"
+ " schema.change.behavior: evolve\n"