huyuanfeng2018 commented on code in PR #3103: URL: https://github.com/apache/amoro/pull/3103#discussion_r1724539673
########## docs/engines/flink/flink-cdc-ingestion.md: ########## @@ -0,0 +1,547 @@ +--- +title: "Flink CDC Ingestion" +url: flink-cdc-ingestion +aliases: + - "flink/cdc" +menu: + main: + parent: User Guides + weight: 400 +--- +<!-- + - Licensed to the Apache Software Foundation (ASF) under one or more + - contributor license agreements. See the NOTICE file distributed with + - this work for additional information regarding copyright ownership. + - The ASF licenses this file to You under the Apache License, Version 2.0 + - (the "License"); you may not use this file except in compliance with + - the License. You may obtain a copy of the License at + - + - http://www.apache.org/licenses/LICENSE-2.0 + - + - Unless required by applicable law or agreed to in writing, software + - distributed under the License is distributed on an "AS IS" BASIS, + - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + - See the License for the specific language governing permissions and + - limitations under the License. + --> +# Apache CDC Ingestion +CDC stands for Change Data Capture, which is a broad concept, as long as it can capture the change data, it can be called CDC. [Flink CDC](https://github.com/apache/flink-cdc) is a Log message-based data capture tool, all the inventory and incremental data can be captured. Taking MySQL as an example, it can easily capture Binlog data through Debezium and process the calculations in real time to send them to the data lake. The data lake can then be queried by other engines. + +This section will show how to ingest one table or multiple tables into the data lake for both [Iceberg](../iceberg-format/) format and [Mixed-Iceberg](../mixed-iceberg-format/) format. +## Ingest into one table +### Iceberg format +The following example will show how [MySQL CDC](https://nightlies.apache.org/flink/flink-cdc-docs-master/docs/connectors/flink-sources/mysql-cdc/) data is written to an Iceberg table. + +**Requirements** + +Please add [Flink SQL Connector MySQL CDC](https://mvnrepository.com/artifact/org.apache.flink/flink-connector-mysql-cdc) and [Iceberg](https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-flink-1.18/1.6.0/iceberg-flink-1.18-1.6.0.jar) Jars to the lib directory of the Flink engine package. + +```sql +CREATE TABLE products ( + id INT, + name STRING, + description STRING, + PRIMARY KEY (id) NOT ENFORCED +) WITH ( + 'connector' = 'mysql-cdc', + 'hostname' = 'localhost', + 'port' = '3306', + 'username' = 'root', + 'password' = '123456', + 'database-name' = 'mydb', + 'table-name' = 'products' +); + +CREATE CATALOG iceberg_hadoop_catalog WITH ( + 'type'='iceberg', + 'catalog-type'='hadoop', + 'warehouse'='hdfs://nn:8020/warehouse/path', + 'property-version'='1' +); + +CREATE TABLE IF NOT EXISTS `iceberg_hadoop_catalog`.`default`.`sample` ( + id INT, + name STRING, + description STRING, + PRIMARY KEY (id) NOT ENFORCED +); + +INSERT INTO `iceberg_hadoop_catalog`.`default`.`sample` SELECT * FROM products; +``` + +### Mixed-Iceberg format +The following example will show how MySQL CDC data is written to a Mixed-Iceberg table. + +**Requirements** + +Please add [Flink SQL Connector MySQL CDC](https://mvnrepository.com/artifact/org.apache.flink/flink-connector-mysql-cdc) and [Amoro](../../../download/) Jars to the lib directory of the Flink engine package. + +```sql +CREATE TABLE products ( + id INT, + name STRING, + description STRING, + PRIMARY KEY (id) NOT ENFORCED +) WITH ( + 'connector' = 'mysql-cdc', + 'hostname' = 'localhost', + 'port' = '3306', + 'username' = 'root', + 'password' = '123456', + 'database-name' = 'mydb', + 'table-name' = 'products' +); + +CREATE CATALOG amoro_catalog WITH ( + 'type'='amoro', + 'metastore.url'='thrift://<ip>:<port>/<catalog_name_in_metastore>' +); + +CREATE TABLE IF NOT EXISTS `amoro_catalog`.`db`.`test_tb`( + id INT, + name STRING, + description STRING, + PRIMARY KEY (id) NOT ENFORCED +); + +INSERT INTO `amoro_catalog`.`db`.`test_tb` SELECT * FROM products; +``` + +## Ingest Into multiple tables +### Iceberg format +The following example will show how to write CDC data from multiple MySQL tables into the corresponding Iceberg table. + +**Requirements** + +Please add [Flink Connector MySQL CDC](https://mvnrepository.com/artifact/org.apache.flink/flink-connector-mysql-cdc) +and [Iceberg](https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-flink-1.18/1.6.0) dependencies to your +Maven project's pom.xml file. + +```java +import static java.util.stream.Collectors.toMap; +import static org.apache.flink.cdc.connectors.mysql.table.MySqlReadableMetadata.DATABASE_NAME; +import static org.apache.flink.cdc.connectors.mysql.table.MySqlReadableMetadata.TABLE_NAME; + +import org.apache.amoro.shade.guava32.com.google.common.collect.Maps; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.cdc.connectors.mysql.source.MySqlSource; +import org.apache.flink.cdc.connectors.mysql.table.MySqlDeserializationConverterFactory; +import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema; +import org.apache.flink.cdc.debezium.table.MetadataConverter; +import org.apache.flink.cdc.debezium.table.RowDataDebeziumDeserializeSchema; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.ResolvedCatalogTable; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.conversion.RowRowConverter; +import org.apache.flink.table.data.utils.JoinedRowData; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.util.Collector; +import org.apache.flink.util.OutputTag; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.flink.CatalogLoader; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.flink.sink.FlinkSink; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.source.SourceRecord; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class MySqlCDC2IcebergExample { + public static void main(String[] args) throws Exception { + List<Tuple2<ObjectPath, ResolvedCatalogTable>> pathAndTable = initSourceTables(); + Map<String, RowDataDebeziumDeserializeSchema> debeziumDeserializeSchemas = getDebeziumDeserializeSchemas(pathAndTable); + MySqlSource<RowData> mySqlSource = MySqlSource.<RowData>builder() + .hostname("yourHostname") + .port(yourPort) + .databaseList("test_db") + // setting up tables to be captured + .tableList("test_db.user", "test_db.product") + .username("yourUsername") + .password("yourPassword") + .deserializer(new CompositeDebeziumDeserializationSchema(debeziumDeserializeSchemas)) + .build(); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + // enable checkpoint + env.enableCheckpointing(60000); + + // Split CDC streams by table name + SingleOutputStreamOperator<Void> process = env + .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source") + .setParallelism(4) + .process(new SplitCdcStreamFunction(pathAndTable.stream() + .collect(toMap(e -> e.f0.toString(), + e -> RowRowConverter.create(e.f1.getResolvedSchema().toPhysicalRowDataType()))))) + .name("split stream"); + + // create Iceberg sink and insert into CDC data + Map<String, String> properties = Maps.newHashMap(); + properties.put(CatalogProperties.WAREHOUSE_LOCATION, "yourWarehouseLocation"); + properties.put(CatalogProperties.URI, "yourThriftUri"); + CatalogLoader catalogLoader = CatalogLoader.hadoop("hadoop_catalog", new Configuration(), properties); + Catalog icebergHadoopCatalog = catalogLoader.loadCatalog(); + Map<String, TableSchema> sinkTableSchemas = new HashMap<>(); + sinkTableSchemas.put("user", TableSchema.builder().field("id", DataTypes.INT()) + .field("name", DataTypes.STRING()).field("op_time", DataTypes.TIMESTAMP()).build()); + sinkTableSchemas.put("product", TableSchema.builder().field("productId", DataTypes.INT()) + .field("price", DataTypes.DECIMAL(12, 6)).field("saleCount", DataTypes.INT()).build()); + + for (Map.Entry<String, TableSchema> entry : sinkTableSchemas.entrySet()) { + TableIdentifier identifier = TableIdentifier.of(Namespace.of("test_db"), entry.getKey()); + Table table = icebergHadoopCatalog.loadTable(identifier); + TableLoader tableLoader = TableLoader.fromCatalog(catalogLoader, identifier); + + FlinkSink.forRowData(process.getSideOutput(new OutputTag<RowData>(entry.getKey()){})) + .tableLoader(tableLoader) + .table(table) + .append(); + } + + env.execute("Sync MySQL to the Iceberg table"); + } + + static class CompositeDebeziumDeserializationSchema + implements DebeziumDeserializationSchema<RowData> { + + private final Map<String, RowDataDebeziumDeserializeSchema> deserializationSchemaMap; + + public CompositeDebeziumDeserializationSchema( + final Map<String, RowDataDebeziumDeserializeSchema> deserializationSchemaMap) { + this.deserializationSchemaMap = deserializationSchemaMap; + } + + @Override + public void deserialize(final SourceRecord record, final Collector<RowData> out) + throws Exception { + final Struct value = (Struct) record.value(); + final Struct source = value.getStruct("source"); + final String db = source.getString("db"); + final String table = source.getString("table"); + if (deserializationSchemaMap == null) { + throw new IllegalStateException("deserializationSchemaMap can not be null!"); + } + deserializationSchemaMap.get(db + "." + table).deserialize(record, out); + } + + @Override + public TypeInformation<RowData> getProducedType() { + return TypeInformation.of(RowData.class); + } + } + + static class SplitCdcStreamFunction extends ProcessFunction<RowData, Void> { + private final Map<String, RowRowConverter> converters; + + public SplitCdcStreamFunction(final Map<String, RowRowConverter> converterMap) { + this.converters = converterMap; + } + + @Override + public void processElement(final RowData rowData, + final ProcessFunction<RowData, Void>.Context ctx, final Collector<Void> out) + throws Exception { + // JoinedRowData like +I{row1=+I(1,2.340000,3), row2=+I(product,test_db)} + // so rowData.getArity() - 2 is the tableName field index + final String tableName = rowData.getString(rowData.getArity() - 2).toString(); + ctx.output(new OutputTag<RowData>(tableName) {}, + getField(JoinedRowData.class, (JoinedRowData) rowData, "row1")); + } + + private static <O, V> V getField(Class<O> clazz, O obj, String fieldName) { + try { + java.lang.reflect.Field field = clazz.getDeclaredField(fieldName); + field.setAccessible(true); + Object v = field.get(obj); + return v == null ? null : (V) v; + } catch (NoSuchFieldException | IllegalAccessException e) { + throw new RuntimeException(e); + } + } + } + + private static List<Tuple2<ObjectPath, ResolvedCatalogTable>> initSourceTables() { + List<Tuple2<ObjectPath, ResolvedCatalogTable>> pathAndTable = new ArrayList<>(); + // build table "user" + Schema userSchema = Schema.newBuilder() + .column("id", DataTypes.INT().notNull()) + .column("name", DataTypes.STRING()) + .column("op_time", DataTypes.TIMESTAMP()) + .primaryKey("id") + .build(); + List<Column> userTableCols = Stream.of( + Column.physical("id", DataTypes.INT().notNull()), + Column.physical("name", DataTypes.STRING()), + Column.physical("op_time", DataTypes.TIMESTAMP())).collect(Collectors.toList()); + Schema.UnresolvedPrimaryKey userPrimaryKey = userSchema.getPrimaryKey().orElseThrow(() -> new RuntimeException("table user required pk ")); + ResolvedSchema userResolvedSchema = new ResolvedSchema(userTableCols, Collections.emptyList(), UniqueConstraint.primaryKey( + userPrimaryKey.getConstraintName(), userPrimaryKey.getColumnNames())); + ResolvedCatalogTable userTable = new ResolvedCatalogTable( + CatalogTable.of(userSchema, "", Collections.emptyList(), new HashMap<>()), userResolvedSchema); + pathAndTable.add(Tuple2.of(new ObjectPath("test_db", "user"), userTable)); + + // build table "product" + Schema productSchema = Schema.newBuilder() + .column("productId", DataTypes.INT().notNull()) + .column("price", DataTypes.DECIMAL(12, 6)) + .column("saleCount", DataTypes.INT()) + .primaryKey("productId") + .build(); + List<Column> productTableCols = Stream.of( + Column.physical("productId", DataTypes.INT().notNull()), + Column.physical("price", DataTypes.DECIMAL(12, 6)), + Column.physical("saleCount", DataTypes.INT())).collect(Collectors.toList()); + Schema.UnresolvedPrimaryKey productPrimaryKey = productSchema.getPrimaryKey().orElseThrow(() -> new RuntimeException("table product required pk ")); + ResolvedSchema productResolvedSchema = new ResolvedSchema(productTableCols, Collections.emptyList(), UniqueConstraint.primaryKey( + productPrimaryKey.getConstraintName(), productPrimaryKey.getColumnNames())); + ResolvedCatalogTable productTable = new ResolvedCatalogTable( + CatalogTable.of(productSchema, "", Collections.emptyList(), new HashMap<>()), productResolvedSchema); + pathAndTable.add(Tuple2.of(new ObjectPath("test_db", "product"), productTable)); + return pathAndTable; + } + + private static Map<String, RowDataDebeziumDeserializeSchema> getDebeziumDeserializeSchemas( + final List<Tuple2<ObjectPath, ResolvedCatalogTable>> pathAndTable) { + return pathAndTable.stream() + .collect(toMap(e -> e.f0.toString(), e -> RowDataDebeziumDeserializeSchema.newBuilder() + .setPhysicalRowType( + (RowType) e.f1.getResolvedSchema().toPhysicalRowDataType().getLogicalType()) + .setUserDefinedConverterFactory(MySqlDeserializationConverterFactory.instance()) + .setMetadataConverters( + new MetadataConverter[] {TABLE_NAME.getConverter(), DATABASE_NAME.getConverter()}) + .setResultTypeInfo(TypeInformation.of(RowData.class)).build())); + } +} +``` + +### Mixed-Iceberg format +The following example will show how to write CDC data from multiple MySQL tables into the corresponding Mixed-Iceberg table. + +**Requirements** + +Please add [Flink Connector MySQL CDC](https://mvnrepository.com/artifact/com.ververica/flink-connector-mysql-cdc/2.3.0) and [Amoro](https://mvnrepository.com/artifact/com.netease.arctic/arctic-flink-runtime-1.14/0.4.1) dependencies to your Maven project's pom.xml file. Review Comment: `https://mvnrepository.com/artifact/com.netease.arctic/arctic-flink-runtime-1.14/0.4.1` Is this link correct? ########## docs/engines/flink/flink-cdc-ingestion.md: ########## @@ -0,0 +1,547 @@ +--- +title: "Flink CDC Ingestion" +url: flink-cdc-ingestion +aliases: + - "flink/cdc" +menu: + main: + parent: User Guides + weight: 400 +--- +<!-- + - Licensed to the Apache Software Foundation (ASF) under one or more + - contributor license agreements. See the NOTICE file distributed with + - this work for additional information regarding copyright ownership. + - The ASF licenses this file to You under the Apache License, Version 2.0 + - (the "License"); you may not use this file except in compliance with + - the License. You may obtain a copy of the License at + - + - http://www.apache.org/licenses/LICENSE-2.0 + - + - Unless required by applicable law or agreed to in writing, software + - distributed under the License is distributed on an "AS IS" BASIS, + - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + - See the License for the specific language governing permissions and + - limitations under the License. + --> +# Apache CDC Ingestion +CDC stands for Change Data Capture, which is a broad concept, as long as it can capture the change data, it can be called CDC. [Flink CDC](https://github.com/apache/flink-cdc) is a Log message-based data capture tool, all the inventory and incremental data can be captured. Taking MySQL as an example, it can easily capture Binlog data through Debezium and process the calculations in real time to send them to the data lake. The data lake can then be queried by other engines. + +This section will show how to ingest one table or multiple tables into the data lake for both [Iceberg](../iceberg-format/) format and [Mixed-Iceberg](../mixed-iceberg-format/) format. +## Ingest into one table +### Iceberg format +The following example will show how [MySQL CDC](https://nightlies.apache.org/flink/flink-cdc-docs-master/docs/connectors/flink-sources/mysql-cdc/) data is written to an Iceberg table. Review Comment: Maybe the link should be to the specific version? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
