This is an automated email from the ASF dual-hosted git repository.

czy006 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c351a06b [AMORO-3102][doc]: upgrade cdc-ingestion doc from flink cdc 
3.x (#3102)
8c351a06b is described below

commit 8c351a06b527026c84ba5ec0b49509c634b72d89
Author: ConradJam <[email protected]>
AuthorDate: Thu Aug 22 10:09:55 2024 +0800

    [AMORO-3102][doc]: upgrade cdc-ingestion doc from flink cdc 3.x (#3102)
    
    * [AMORO-3102][doc]: upgrade cdc-ingestion doc from flink cdc 3.x
    
    * upgrade cdc-ingestion doc
    
    * fix links
    
    ---------
    
    Co-authored-by: ConradJam <[email protected]>
---
 .../flink/flink-cdc-ingestion.md}                  |  67 +--
 docs/user-guides/cdc-ingestion.md                  | 514 +--------------------
 2 files changed, 62 insertions(+), 519 deletions(-)

diff --git a/docs/user-guides/cdc-ingestion.md 
b/docs/engines/flink/flink-cdc-ingestion.md
similarity index 89%
copy from docs/user-guides/cdc-ingestion.md
copy to docs/engines/flink/flink-cdc-ingestion.md
index 68226a412..7443df3d2 100644
--- a/docs/user-guides/cdc-ingestion.md
+++ b/docs/engines/flink/flink-cdc-ingestion.md
@@ -1,8 +1,8 @@
 ---
-title: "CDC Ingestion"
-url: cdc-ingestion
+title: "Flink CDC Ingestion"
+url: flink-cdc-ingestion
 aliases:
-    - "user-guides/cdc-ingestion"
+  - "flink/cdc"
 menu:
     main:
         parent: User Guides
@@ -24,17 +24,17 @@ menu:
  - See the License for the specific language governing permissions and
  - limitations under the License.
  -->
-# CDC Ingestion
-CDC stands for Change Data Capture, which is a broad concept, as long as it 
can capture the change data, it can be called CDC. [Flink 
CDC](https://github.com/ververica/flink-cdc-connectors) is a Log message-based 
data capture tool, all the inventory and incremental data can be captured. 
Taking MySQL as an example, it can easily capture Binlog data through Debezium 
and process the calculations in real time to send them to the data lake. The 
data lake can then be queried by other engines.
+# Apache CDC Ingestion
+CDC stands for Change Data Capture, which is a broad concept, as long as it 
can capture the change data, it can be called CDC. [Flink 
CDC](https://github.com/apache/flink-cdc) is a Log message-based data capture 
tool, all the inventory and incremental data can be captured. Taking MySQL as 
an example, it can easily capture Binlog data through Debezium and process the 
calculations in real time to send them to the data lake. The data lake can then 
be queried by other engines.
 
 This section will show how to ingest one table or multiple tables into the 
data lake for both [Iceberg](../iceberg-format/) format and 
[Mixed-Iceberg](../mixed-iceberg-format/) format.
 ## Ingest into one table
 ### Iceberg format
-The following example will show how MySQL CDC data is written to an Iceberg 
table.
+The following example will show how [MySQL 
CDC](https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/) data 
is written to an Iceberg table.
 
 **Requirements**
 
-Please add [Flink SQL Connector MySQL 
CDC](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.3.0/flink-connector-mysql-cdc-2.3.0.jar)
 and 
[Iceberg](https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-flink-1.14/1.1.0/iceberg-flink-1.14-1.1.0.jar)
 Jars to the lib directory of the Flink engine package.
+Please add [Flink SQL Connector MySQL 
CDC](https://mvnrepository.com/artifact/org.apache.flink/flink-connector-mysql-cdc)
 and 
[Iceberg](https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-flink-1.18/1.6.0/iceberg-flink-1.18-1.6.0.jar)
 Jars to the lib directory of the Flink engine package.
 
 ```sql
 CREATE TABLE products (
@@ -74,7 +74,7 @@ The following example will show how MySQL CDC data is written 
to a Mixed-Iceberg
 
 **Requirements**
 
-Please add [Flink SQL Connector MySQL 
CDC](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.3.0/flink-connector-mysql-cdc-2.3.0.jar)
 and [Amoro](../../../download/) Jars to the lib directory of the Flink engine 
package.
+Please add [Flink SQL Connector MySQL 
CDC](https://mvnrepository.com/artifact/org.apache.flink/flink-connector-mysql-cdc/3.1.1)
 and [Amoro](../../../download/) Jars to the lib directory of the Flink engine 
package.
 
 ```sql
 CREATE TABLE products (
@@ -92,19 +92,19 @@ CREATE TABLE products (
     'table-name' = 'products'
 );
 
-CREATE CATALOG arctic_catalog WITH (
-    'type'='arctic',
+CREATE CATALOG amoro_catalog WITH (
+    'type'='amoro',
     'metastore.url'='thrift://<ip>:<port>/<catalog_name_in_metastore>'
 ); 
 
-CREATE TABLE IF NOT EXISTS `arctic_catalog`.`db`.`test_tb`(
+CREATE TABLE IF NOT EXISTS `amoro_catalog`.`db`.`test_tb`(
     id INT,
     name STRING,
     description STRING,
     PRIMARY KEY (id) NOT ENFORCED
 );
 
-INSERT INTO `arctic_catalog`.`db`.`test_tb` SELECT * FROM products;
+INSERT INTO `amoro_catalog`.`db`.`test_tb` SELECT * FROM products;
 ```
 
 ## Ingest Into multiple tables
@@ -113,22 +113,35 @@ The following example will show how to write CDC data 
from multiple MySQL tables
 
 **Requirements**
 
-Please add [Flink Connector MySQL 
CDC](https://mvnrepository.com/artifact/com.ververica/flink-connector-mysql-cdc/2.3.0)
 and 
[Iceberg](https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-flink-1.14/1.1.0)
 dependencies to your Maven project's pom.xml file.
+Please add [Flink Connector MySQL 
CDC](https://mvnrepository.com/artifact/org.apache.flink/flink-connector-mysql-cdc/3.1.1)
+and 
[Iceberg](https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-flink-1.18/1.6.0)
 dependencies to your 
+Maven project's pom.xml file.
 
 ```java
-import com.ververica.cdc.connectors.mysql.source.MySqlSource;
-import 
com.ververica.cdc.connectors.mysql.table.MySqlDeserializationConverterFactory;
-import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
-import com.ververica.cdc.debezium.table.MetadataConverter;
-import com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema;
+import static java.util.stream.Collectors.toMap;
+import static 
org.apache.flink.cdc.connectors.mysql.table.MySqlReadableMetadata.DATABASE_NAME;
+import static 
org.apache.flink.cdc.connectors.mysql.table.MySqlReadableMetadata.TABLE_NAME;
+
+import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.cdc.connectors.mysql.source.MySqlSource;
+import 
org.apache.flink.cdc.connectors.mysql.table.MySqlDeserializationConverterFactory;
+import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
+import org.apache.flink.cdc.debezium.table.MetadataConverter;
+import org.apache.flink.cdc.debezium.table.RowDataDebeziumDeserializeSchema;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.ProcessFunction;
-import org.apache.flink.table.api.*;
-import org.apache.flink.table.catalog.*;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.conversion.RowRowConverter;
 import org.apache.flink.table.data.utils.JoinedRowData;
@@ -144,14 +157,16 @@ import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.flink.CatalogLoader;
 import org.apache.iceberg.flink.TableLoader;
 import org.apache.iceberg.flink.sink.FlinkSink;
-import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
 import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.source.SourceRecord;
-import java.util.*;
 
-import static 
com.ververica.cdc.connectors.mysql.table.MySqlReadableMetadata.DATABASE_NAME;
-import static 
com.ververica.cdc.connectors.mysql.table.MySqlReadableMetadata.TABLE_NAME;
-import static java.util.stream.Collectors.toMap;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 public class MySqlCDC2IcebergExample {
   public static void main(String[] args) throws Exception {
@@ -326,7 +341,7 @@ The following example will show how to write CDC data from 
multiple MySQL tables
 
 **Requirements**
 
-Please add [Flink Connector MySQL 
CDC](https://mvnrepository.com/artifact/com.ververica/flink-connector-mysql-cdc/2.3.0)
 and 
[Amoro](https://mvnrepository.com/artifact/com.netease.arctic/arctic-flink-runtime-1.14/0.4.1)
 dependencies to your Maven project's pom.xml file.
+Please add [Flink Connector MySQL 
CDC](https://mvnrepository.com/artifact/org.apache.flink/flink-connector-mysql-cdc/3.1.1)
 and 
[Amoro](https://mvnrepository.com/artifact/org.apache.amoro/amoro-mixed-format-flink-1.17/0.7.0-incubating)
 dependencies to your Maven project's pom.xml file.
 
 ```java
 import org.apache.amoro.flink.InternalCatalogBuilder;
diff --git a/docs/user-guides/cdc-ingestion.md 
b/docs/user-guides/cdc-ingestion.md
index 68226a412..23f00a4d0 100644
--- a/docs/user-guides/cdc-ingestion.md
+++ b/docs/user-guides/cdc-ingestion.md
@@ -25,508 +25,36 @@ menu:
  - limitations under the License.
  -->
 # CDC Ingestion
-CDC stands for Change Data Capture, which is a broad concept, as long as it 
can capture the change data, it can be called CDC. [Flink 
CDC](https://github.com/ververica/flink-cdc-connectors) is a Log message-based 
data capture tool, all the inventory and incremental data can be captured. 
Taking MySQL as an example, it can easily capture Binlog data through Debezium 
and process the calculations in real time to send them to the data lake. The 
data lake can then be queried by other engines.
+CDC stands for Change Data Capture, which is a broad concept, as long as it 
can capture the change data, it can be called CDC.
+[Flink CDC](https://github.com/apache/flink-cdc) is a Log message-based data 
capture tool, all the inventory 
+and incremental data can be captured. Taking MySQL as an example, it can 
easily capture Binlog data through 
+[Debezium](https://debezium.io/)、[Flink 
CDC](https://github.com/apache/flink-cdc) and process the calculations in real 
time to send them to the data lake. The data lake can then be 
+queried by other engines.
 
 This section will show how to ingest one table or multiple tables into the 
data lake for both [Iceberg](../iceberg-format/) format and 
[Mixed-Iceberg](../mixed-iceberg-format/) format.
-## Ingest into one table
-### Iceberg format
-The following example will show how MySQL CDC data is written to an Iceberg 
table.
+## Apache Flink CDC
 
-**Requirements**
+[**Apache Flink 
CDC**](https://nightlies.apache.org/flink/flink-cdc-docs-stable/) is a 
distributed data integration 
+tool for real time data and batch data. Flink CDC brings the 
+simplicity and elegance of data integration via YAML to describe the data 
movement and transformation.
 
-Please add [Flink SQL Connector MySQL 
CDC](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.3.0/flink-connector-mysql-cdc-2.3.0.jar)
 and 
[Iceberg](https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-flink-1.14/1.1.0/iceberg-flink-1.14-1.1.0.jar)
 Jars to the lib directory of the Flink engine package.
+Amoro provides the relevant code case reference how to complete cdc data to 
different lakehouse table format, see 
+[**flink-cdc-ingestion**](../engines/flink/flink-cdc-ingestion.md) doc
 
-```sql
-CREATE TABLE products (
-    id INT,
-    name STRING,
-    description STRING,
-    PRIMARY KEY (id) NOT ENFORCED
-) WITH (
-    'connector' = 'mysql-cdc',
-    'hostname' = 'localhost',
-    'port' = '3306',
-    'username' = 'root',
-    'password' = '123456',
-    'database-name' = 'mydb',
-    'table-name' = 'products'
-);
-  
-CREATE CATALOG iceberg_hadoop_catalog WITH (
-    'type'='iceberg',
-    'catalog-type'='hadoop',
-    'warehouse'='hdfs://nn:8020/warehouse/path',
-    'property-version'='1'
-);
+At the same time, we provide [**Mixed-Iceberg**](../formats/mixed-iceberg.md)  
format, which you can understand as 
+**STREAMING** For iceberg, which will enhance your real-time processing scene 
for you
 
-CREATE TABLE IF NOT EXISTS `iceberg_hadoop_catalog`.`default`.`sample` (
-    id INT,
-    name STRING,
-    description STRING,
-    PRIMARY KEY (id) NOT ENFORCED
-);
+## Debezium
 
-INSERT INTO `iceberg_hadoop_catalog`.`default`.`sample` SELECT * FROM products;
-```
+Debezium is an open source distributed platform for change data capture. Start 
it up, point it at your databases, and your apps can start responding to all of 
the inserts, updates, and deletes that other apps commit to your databases. 
Debezium is durable and fast, so your apps can respond quickly and never miss 
an event, even when things go wrong.
 
-### Mixed-Iceberg format
-The following example will show how MySQL CDC data is written to a 
Mixed-Iceberg table.
+### Demo
 
-**Requirements**
+Coming Soon
 
-Please add [Flink SQL Connector MySQL 
CDC](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.3.0/flink-connector-mysql-cdc-2.3.0.jar)
 and [Amoro](../../../download/) Jars to the lib directory of the Flink engine 
package.
+## Airbyte
 
-```sql
-CREATE TABLE products (
-    id INT,
-    name STRING,
-    description STRING,
-    PRIMARY KEY (id) NOT ENFORCED
-) WITH (
-    'connector' = 'mysql-cdc',
-    'hostname' = 'localhost',
-    'port' = '3306',
-    'username' = 'root',
-    'password' = '123456',
-    'database-name' = 'mydb',
-    'table-name' = 'products'
-);
+Airbyte is Data integration platform for ELT pipelines from APIs, databases & 
files to databases, warehouses & lakes
 
-CREATE CATALOG arctic_catalog WITH (
-    'type'='arctic',
-    'metastore.url'='thrift://<ip>:<port>/<catalog_name_in_metastore>'
-); 
-
-CREATE TABLE IF NOT EXISTS `arctic_catalog`.`db`.`test_tb`(
-    id INT,
-    name STRING,
-    description STRING,
-    PRIMARY KEY (id) NOT ENFORCED
-);
-
-INSERT INTO `arctic_catalog`.`db`.`test_tb` SELECT * FROM products;
-```
-
-## Ingest Into multiple tables
-### Iceberg format
-The following example will show how to write CDC data from multiple MySQL 
tables into the corresponding Iceberg table.
-
-**Requirements**
-
-Please add [Flink Connector MySQL 
CDC](https://mvnrepository.com/artifact/com.ververica/flink-connector-mysql-cdc/2.3.0)
 and 
[Iceberg](https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-flink-1.14/1.1.0)
 dependencies to your Maven project's pom.xml file.
-
-```java
-import com.ververica.cdc.connectors.mysql.source.MySqlSource;
-import 
com.ververica.cdc.connectors.mysql.table.MySqlDeserializationConverterFactory;
-import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
-import com.ververica.cdc.debezium.table.MetadataConverter;
-import com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema;
-import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.ProcessFunction;
-import org.apache.flink.table.api.*;
-import org.apache.flink.table.catalog.*;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.conversion.RowRowConverter;
-import org.apache.flink.table.data.utils.JoinedRowData;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.OutputTag;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.iceberg.CatalogProperties;
-import org.apache.iceberg.Table;
-import org.apache.iceberg.catalog.Catalog;
-import org.apache.iceberg.catalog.Namespace;
-import org.apache.iceberg.catalog.TableIdentifier;
-import org.apache.iceberg.flink.CatalogLoader;
-import org.apache.iceberg.flink.TableLoader;
-import org.apache.iceberg.flink.sink.FlinkSink;
-import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
-import org.apache.kafka.connect.data.Struct;
-import org.apache.kafka.connect.source.SourceRecord;
-import java.util.*;
-
-import static 
com.ververica.cdc.connectors.mysql.table.MySqlReadableMetadata.DATABASE_NAME;
-import static 
com.ververica.cdc.connectors.mysql.table.MySqlReadableMetadata.TABLE_NAME;
-import static java.util.stream.Collectors.toMap;
-
-public class MySqlCDC2IcebergExample {
-  public static void main(String[] args) throws Exception {
-    List<Tuple2<ObjectPath, ResolvedCatalogTable>> pathAndTable = 
initSourceTables();
-    Map<String, RowDataDebeziumDeserializeSchema> debeziumDeserializeSchemas = 
getDebeziumDeserializeSchemas(pathAndTable);
-    MySqlSource<RowData> mySqlSource = MySqlSource.<RowData>builder()
-      .hostname("yourHostname")
-      .port(yourPort)
-      .databaseList("test_db")
-      // setting up tables to be captured
-      .tableList("test_db.user", "test_db.product")
-      .username("yourUsername")
-      .password("yourPassword")
-      .deserializer(new 
CompositeDebeziumDeserializationSchema(debeziumDeserializeSchemas))
-      .build();
-
-    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
-    // enable checkpoint
-    env.enableCheckpointing(60000);
-    
-    // Split CDC streams by table name
-    SingleOutputStreamOperator<Void> process = env
-      .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL 
Source")
-      .setParallelism(4)
-      .process(new SplitCdcStreamFunction(pathAndTable.stream()
-          .collect(toMap(e -> e.f0.toString(),
-              e -> 
RowRowConverter.create(e.f1.getResolvedSchema().toPhysicalRowDataType())))))
-      .name("split stream");
-
-    // create Iceberg sink and insert into CDC data
-    Map<String, String> properties = Maps.newHashMap();
-    properties.put(CatalogProperties.WAREHOUSE_LOCATION, 
"yourWarehouseLocation");
-    properties.put(CatalogProperties.URI, "yourThriftUri");
-    CatalogLoader catalogLoader = CatalogLoader.hadoop("hadoop_catalog", new 
Configuration(), properties);
-    Catalog icebergHadoopCatalog = catalogLoader.loadCatalog();
-    Map<String, TableSchema> sinkTableSchemas = new HashMap<>();
-    sinkTableSchemas.put("user", TableSchema.builder().field("id", 
DataTypes.INT())
-      .field("name", DataTypes.STRING()).field("op_time", 
DataTypes.TIMESTAMP()).build());
-    sinkTableSchemas.put("product", TableSchema.builder().field("productId", 
DataTypes.INT())
-      .field("price", DataTypes.DECIMAL(12, 6)).field("saleCount", 
DataTypes.INT()).build());
-
-    for (Map.Entry<String, TableSchema> entry : sinkTableSchemas.entrySet()) {
-      TableIdentifier identifier = TableIdentifier.of(Namespace.of("test_db"), 
entry.getKey());
-      Table table = icebergHadoopCatalog.loadTable(identifier);
-      TableLoader tableLoader = TableLoader.fromCatalog(catalogLoader, 
identifier);
-
-      FlinkSink.forRowData(process.getSideOutput(new 
OutputTag<RowData>(entry.getKey()){}))
-        .tableLoader(tableLoader)
-        .table(table)
-        .append();
-    }
-
-    env.execute("Sync MySQL to the Iceberg table");
-  }
-
-  static class CompositeDebeziumDeserializationSchema
-    implements DebeziumDeserializationSchema<RowData> {
-
-    private final Map<String, RowDataDebeziumDeserializeSchema> 
deserializationSchemaMap;
-
-    public CompositeDebeziumDeserializationSchema(
-      final Map<String, RowDataDebeziumDeserializeSchema> 
deserializationSchemaMap) {
-      this.deserializationSchemaMap = deserializationSchemaMap;
-    }
-
-    @Override
-    public void deserialize(final SourceRecord record, final 
Collector<RowData> out)
-      throws Exception {
-      final Struct value = (Struct) record.value();
-      final Struct source = value.getStruct("source");
-      final String db = source.getString("db");
-      final String table = source.getString("table");
-      if (deserializationSchemaMap == null) {
-        throw new IllegalStateException("deserializationSchemaMap can not be 
null!");
-      }
-      deserializationSchemaMap.get(db + "." + table).deserialize(record, out);
-    }
-
-    @Override
-    public TypeInformation<RowData> getProducedType() {
-      return TypeInformation.of(RowData.class);
-    }
-  }
-
-  static class SplitCdcStreamFunction extends ProcessFunction<RowData, Void> {
-    private final Map<String, RowRowConverter> converters;
-
-    public SplitCdcStreamFunction(final Map<String, RowRowConverter> 
converterMap) {
-      this.converters = converterMap;
-    }
-
-    @Override
-    public void processElement(final RowData rowData,
-                               final ProcessFunction<RowData, Void>.Context 
ctx, final Collector<Void> out)
-      throws Exception {
-      // JoinedRowData like +I{row1=+I(1,2.340000,3), row2=+I(product,test_db)}
-      // so rowData.getArity() - 2 is the tableName field index
-      final String tableName = rowData.getString(rowData.getArity() - 
2).toString();
-      ctx.output(new OutputTag<RowData>(tableName) {},
-        getField(JoinedRowData.class, (JoinedRowData) rowData, "row1"));
-    }
-
-    private static <O, V> V getField(Class<O> clazz, O obj, String fieldName) {
-      try {
-        java.lang.reflect.Field field = clazz.getDeclaredField(fieldName);
-        field.setAccessible(true);
-        Object v = field.get(obj);
-        return v == null ? null : (V) v;
-      } catch (NoSuchFieldException | IllegalAccessException e) {
-        throw new RuntimeException(e);
-      }
-    }
-  }
-
-  private static List<Tuple2<ObjectPath, ResolvedCatalogTable>> 
initSourceTables() {
-    List<Tuple2<ObjectPath, ResolvedCatalogTable>> pathAndTable = new 
ArrayList<>();
-    // build table "user"
-    Schema userSchema = Schema.newBuilder()
-      .column("id", DataTypes.INT().notNull())
-      .column("name", DataTypes.STRING())
-      .column("op_time", DataTypes.TIMESTAMP())
-      .primaryKey("id")
-      .build();
-    List<Column> userTableCols = Stream.of(
-      Column.physical("id", DataTypes.INT().notNull()),
-      Column.physical("name", DataTypes.STRING()),
-      Column.physical("op_time", 
DataTypes.TIMESTAMP())).collect(Collectors.toList());
-    Schema.UnresolvedPrimaryKey userPrimaryKey = 
userSchema.getPrimaryKey().orElseThrow(() -> new RuntimeException("table user 
required pk "));
-    ResolvedSchema userResolvedSchema = new ResolvedSchema(userTableCols, 
Collections.emptyList(), UniqueConstraint.primaryKey(
-      userPrimaryKey.getConstraintName(), userPrimaryKey.getColumnNames()));
-    ResolvedCatalogTable userTable = new ResolvedCatalogTable(
-      CatalogTable.of(userSchema, "", Collections.emptyList(), new 
HashMap<>()), userResolvedSchema);
-    pathAndTable.add(Tuple2.of(new ObjectPath("test_db", "user"), userTable));
-
-    // build table "product"
-    Schema productSchema = Schema.newBuilder()
-      .column("productId", DataTypes.INT().notNull())
-      .column("price", DataTypes.DECIMAL(12, 6))
-      .column("saleCount", DataTypes.INT())
-      .primaryKey("productId")
-      .build();
-    List<Column> productTableCols = Stream.of(
-      Column.physical("productId", DataTypes.INT().notNull()),
-      Column.physical("price", DataTypes.DECIMAL(12, 6)),
-      Column.physical("saleCount", 
DataTypes.INT())).collect(Collectors.toList());
-    Schema.UnresolvedPrimaryKey productPrimaryKey = 
productSchema.getPrimaryKey().orElseThrow(() -> new RuntimeException("table 
product required pk "));
-    ResolvedSchema productResolvedSchema = new 
ResolvedSchema(productTableCols, Collections.emptyList(), 
UniqueConstraint.primaryKey(
-      productPrimaryKey.getConstraintName(), 
productPrimaryKey.getColumnNames()));
-    ResolvedCatalogTable productTable = new ResolvedCatalogTable(
-      CatalogTable.of(productSchema, "", Collections.emptyList(), new 
HashMap<>()), productResolvedSchema);
-    pathAndTable.add(Tuple2.of(new ObjectPath("test_db", "product"), 
productTable));
-    return pathAndTable;
-  }
-
-  private static Map<String, RowDataDebeziumDeserializeSchema> 
getDebeziumDeserializeSchemas(
-    final List<Tuple2<ObjectPath, ResolvedCatalogTable>> pathAndTable) {
-    return pathAndTable.stream()
-      .collect(toMap(e -> e.f0.toString(), e -> 
RowDataDebeziumDeserializeSchema.newBuilder()
-        .setPhysicalRowType(
-          (RowType) 
e.f1.getResolvedSchema().toPhysicalRowDataType().getLogicalType())
-        
.setUserDefinedConverterFactory(MySqlDeserializationConverterFactory.instance())
-        .setMetadataConverters(
-          new MetadataConverter[] {TABLE_NAME.getConverter(), 
DATABASE_NAME.getConverter()})
-        .setResultTypeInfo(TypeInformation.of(RowData.class)).build()));
-  }
-}
-```
-
-### Mixed-Iceberg format
-The following example will show how to write CDC data from multiple MySQL 
tables into the corresponding Mixed-Iceberg table.
-
-**Requirements**
-
-Please add [Flink Connector MySQL 
CDC](https://mvnrepository.com/artifact/com.ververica/flink-connector-mysql-cdc/2.3.0)
 and 
[Amoro](https://mvnrepository.com/artifact/com.netease.arctic/arctic-flink-runtime-1.14/0.4.1)
 dependencies to your Maven project's pom.xml file.
-
-```java
-import org.apache.amoro.flink.InternalCatalogBuilder;
-import org.apache.amoro.flink.table.MixedFormatTableLoader;
-import org.apache.amoro.flink.util.MixedFormatUtils;
-import org.apache.amoro.flink.write.FlinkSink;
-import org.apache.amoro.table.TableIdentifier;
-import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.cdc.connectors.mysql.source.MySqlSource;
-import 
org.apache.flink.cdc.connectors.mysql.table.MySqlDeserializationConverterFactory;
-import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
-import org.apache.flink.cdc.debezium.table.MetadataConverter;
-import org.apache.flink.cdc.debezium.table.RowDataDebeziumDeserializeSchema;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.ProcessFunction;
-import org.apache.flink.table.api.*;
-import org.apache.flink.table.catalog.*;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.conversion.RowRowConverter;
-import org.apache.flink.table.data.utils.JoinedRowData;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.OutputTag;
-import org.apache.kafka.connect.data.Struct;
-import org.apache.kafka.connect.source.SourceRecord;
-
-import java.util.*;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import static java.util.stream.Collectors.toMap;
-import static 
org.apache.flink.cdc.connectors.mysql.table.MySqlReadableMetadata.DATABASE_NAME;
-import static 
org.apache.flink.cdc.connectors.mysql.table.MySqlReadableMetadata.TABLE_NAME;
-
-public class MySqlCDC2MixedIcebergExample {
-    public static void main(String[] args) throws Exception {
-        List<Tuple2<ObjectPath, ResolvedCatalogTable>> pathAndTable = 
initSourceTables();
-        Map<String, RowDataDebeziumDeserializeSchema> 
debeziumDeserializeSchemas = getDebeziumDeserializeSchemas(
-                pathAndTable);
-        MySqlSource<RowData> mySqlSource = MySqlSource.<RowData>builder()
-                .hostname("yourHostname")
-                .port(3306)
-                .databaseList("test_db")
-                // setting up tables to be captured
-                .tableList("test_db.user", "test_db.product")
-                .username("yourUsername")
-                .password("yourPassword")
-                .deserializer(new 
CompositeDebeziumDeserializationSchema(debeziumDeserializeSchemas))
-                .build();
-
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
-        // enable checkpoint
-        env.enableCheckpointing(60000);
-
-        // Split CDC streams by table name
-        SingleOutputStreamOperator<Void> process = env
-                .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), 
"MySQL Source").setParallelism(4)
-                .process(new SplitCdcStreamFunction(pathAndTable.stream()
-                        .collect(toMap(e -> e.f0.toString(),
-                                e -> 
RowRowConverter.create(e.f1.getResolvedSchema().toPhysicalRowDataType())))))
-                .name("split stream");
-
-        // create Amoro sink and insert into cdc data
-        InternalCatalogBuilder catalogBuilder = 
InternalCatalogBuilder.builder().metastoreUrl(
-                "thrift://<ip>:<port>/<catalog_name_in_metastore>");
-        Map<String, TableSchema> sinkTableSchemas = new HashMap<>();
-        sinkTableSchemas.put("user", TableSchema.builder().field("id", 
DataTypes.INT())
-                .field("name", DataTypes.STRING()).field("op_time", 
DataTypes.TIMESTAMP()).build());
-        sinkTableSchemas.put("product", 
TableSchema.builder().field("productId", DataTypes.INT())
-                .field("price", DataTypes.DECIMAL(12, 6)).field("saleCount", 
DataTypes.INT()).build());
-
-        for (Map.Entry<String, TableSchema> entry : 
sinkTableSchemas.entrySet()) {
-            TableIdentifier tableId =
-                    TableIdentifier.of("yourCatalogName", "yourDatabaseName", 
entry.getKey());
-            MixedFormatTableLoader tableLoader = 
MixedFormatTableLoader.of(tableId, catalogBuilder);
-
-            FlinkSink.forRowData(process.getSideOutput(new 
OutputTag<RowData>(entry.getKey()) {
-                    }))
-                    .flinkSchema(entry.getValue())
-                    .table(MixedFormatUtils.loadMixedTable(tableLoader))
-                    .tableLoader(tableLoader).build();
-        }
-
-        env.execute("Sync MySQL to Mixed-Iceberg table");
-    }
-
-    static class CompositeDebeziumDeserializationSchema
-            implements DebeziumDeserializationSchema<RowData> {
-
-        private final Map<String, RowDataDebeziumDeserializeSchema> 
deserializationSchemaMap;
-
-        public CompositeDebeziumDeserializationSchema(
-                final Map<String, RowDataDebeziumDeserializeSchema> 
deserializationSchemaMap) {
-            this.deserializationSchemaMap = deserializationSchemaMap;
-        }
-
-        @Override
-        public void deserialize(final SourceRecord record, final 
Collector<RowData> out)
-                throws Exception {
-            final Struct value = (Struct) record.value();
-            final Struct source = value.getStruct("source");
-            final String db = source.getString("db");
-            final String table = source.getString("table");
-            if (deserializationSchemaMap == null) {
-                throw new IllegalStateException("deserializationSchemaMap can 
not be null!");
-            }
-            deserializationSchemaMap.get(db + "." + table).deserialize(record, 
out);
-        }
-
-        @Override
-        public TypeInformation<RowData> getProducedType() {
-            return TypeInformation.of(RowData.class);
-        }
-    }
-
-    static class SplitCdcStreamFunction extends ProcessFunction<RowData, Void> 
{
-        private final Map<String, RowRowConverter> converters;
-
-        public SplitCdcStreamFunction(final Map<String, RowRowConverter> 
converterMap) {
-            this.converters = converterMap;
-        }
-
-        @Override
-        public void processElement(final RowData rowData,
-                                   final ProcessFunction<RowData, 
Void>.Context ctx, final Collector<Void> out)
-                throws Exception {
-            // JoinedRowData like +I{row1=+I(1,2.340000,3), 
row2=+I(product,test_db)}
-            // so rowData.getArity() - 2 is the tableName field index
-            final String tableName = rowData.getString(rowData.getArity() - 
2).toString();
-            ctx.output(new OutputTag<RowData>(tableName) {
-                       },
-                    getField(JoinedRowData.class, (JoinedRowData) rowData, 
"row1"));
-        }
-
-        private static <O, V> V getField(Class<O> clazz, O obj, String 
fieldName) {
-            try {
-                java.lang.reflect.Field field = 
clazz.getDeclaredField(fieldName);
-                field.setAccessible(true);
-                Object v = field.get(obj);
-                return v == null ? null : (V) v;
-            } catch (NoSuchFieldException | IllegalAccessException e) {
-                throw new RuntimeException(e);
-            }
-        }
-    }
-
-    private static List<Tuple2<ObjectPath, ResolvedCatalogTable>> 
initSourceTables() {
-        List<Tuple2<ObjectPath, ResolvedCatalogTable>> pathAndTable = new 
ArrayList<>();
-        // build table "user"
-        Schema userSchema = Schema.newBuilder()
-                .column("id", DataTypes.INT().notNull())
-                .column("name", DataTypes.STRING())
-                .column("op_time", DataTypes.TIMESTAMP())
-                .primaryKey("id")
-                .build();
-        List<Column> userTableCols = Stream.of(
-                Column.physical("id", DataTypes.INT().notNull()),
-                Column.physical("name", DataTypes.STRING()),
-                Column.physical("op_time", 
DataTypes.TIMESTAMP())).collect(Collectors.toList());
-        Schema.UnresolvedPrimaryKey userPrimaryKey = 
userSchema.getPrimaryKey().orElseThrow(() -> new RuntimeException("table user 
required pk "));
-        ResolvedSchema userResolvedSchema = new ResolvedSchema(userTableCols, 
Collections.emptyList(), UniqueConstraint.primaryKey(
-                userPrimaryKey.getConstraintName(), 
userPrimaryKey.getColumnNames()));
-        ResolvedCatalogTable userTable = new ResolvedCatalogTable(
-                CatalogTable.of(userSchema, "", Collections.emptyList(), new 
HashMap<>()), userResolvedSchema);
-        pathAndTable.add(Tuple2.of(new ObjectPath("test_db", "user"), 
userTable));
-
-        // build table "product"
-        Schema productSchema = Schema.newBuilder()
-                .column("productId", DataTypes.INT().notNull())
-                .column("price", DataTypes.DECIMAL(12, 6))
-                .column("saleCount", DataTypes.INT())
-                .primaryKey("productId")
-                .build();
-        List<Column> productTableCols = Stream.of(
-                Column.physical("productId", DataTypes.INT().notNull()),
-                Column.physical("price", DataTypes.DECIMAL(12, 6)),
-                Column.physical("saleCount", 
DataTypes.INT())).collect(Collectors.toList());
-        Schema.UnresolvedPrimaryKey productPrimaryKey = 
productSchema.getPrimaryKey().orElseThrow(() -> new RuntimeException("table 
product required pk "));
-        ResolvedSchema productResolvedSchema = new 
ResolvedSchema(productTableCols, Collections.emptyList(), 
UniqueConstraint.primaryKey(
-                productPrimaryKey.getConstraintName(), 
productPrimaryKey.getColumnNames()));
-        ResolvedCatalogTable productTable = new ResolvedCatalogTable(
-                CatalogTable.of(productSchema, "", Collections.emptyList(), 
new HashMap<>()), productResolvedSchema);
-        pathAndTable.add(Tuple2.of(new ObjectPath("test_db", "product"), 
productTable));
-        return pathAndTable;
-    }
-
-    private static Map<String, RowDataDebeziumDeserializeSchema> 
getDebeziumDeserializeSchemas(
-            final List<Tuple2<ObjectPath, ResolvedCatalogTable>> pathAndTable) 
{
-        return pathAndTable.stream()
-                .collect(toMap(e -> e.f0.toString(), e -> 
RowDataDebeziumDeserializeSchema.newBuilder()
-                        .setPhysicalRowType(
-                                (RowType) 
e.f1.getResolvedSchema().toPhysicalRowDataType().getLogicalType())
-                        
.setUserDefinedConverterFactory(MySqlDeserializationConverterFactory.instance())
-                        .setMetadataConverters(
-                                new 
MetadataConverter[]{TABLE_NAME.getConverter(), DATABASE_NAME.getConverter()})
-                        
.setResultTypeInfo(TypeInformation.of(RowData.class)).build()));
-    }
-}
-```
\ No newline at end of file
+### Demo
+Coming Soon


Reply via email to