leonardBang commented on a change in pull request #18738: URL: https://github.com/apache/flink/pull/18738#discussion_r806704754
########## File path: docs/content/docs/connectors/table/formats/ogg.md ########## @@ -0,0 +1,270 @@ +--- +title: Ogg +weight: 8 +type: docs +aliases: + - /dev/table/connectors/formats/ogg.html +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Ogg Format + +{{< label "Changelog-Data-Capture Format" >}} {{< label "Format: Serialization Schema" >}} {{< +label "Format: Deserialization Schema" >}} + +[Oracle GoldenGate](https://www.oracle.com/integration/goldengate//) (a.k.a ogg) is a managed service +providing a real-time data mesh platform, which uses replication to keep data highly available, and +enabling real-time analysis. Customers can design, execute, and monitor their data replication and +stream data processing solutions without the need to allocate or manage compute environments. Ogg +provides a format schema for changelog and supports to serialize messages using JSON. + +Flink supports to interpret Ogg JSON as INSERT/UPDATE/DELETE messages into Flink SQL system. This is +useful in many cases to leverage this feature, such as + +- synchronizing incremental data from databases to other systems +- auditing logs +- real-time materialized views on databases +- temporal join changing history of a database table and so on. + +Flink also supports to encode the INSERT/UPDATE/DELETE messages in Flink SQL as Ogg JSON, and emit +to external systems like Kafka. However, currently Flink can't combine UPDATE_BEFORE and +UPDATE_AFTER into a single UPDATE message. Therefore, Flink encodes UPDATE_BEFORE and UPDATE_AFTER +as DELETE and INSERT Ogg messages. + +Dependencies +------------ + +#### Ogg Json + +{{< sql_download_table "ogg-json" >}} + +*Note: please refer +to [Ogg Kafka Handler documentation](https://docs.oracle.com/en/middleware/goldengate/big-data/19.1/gadbd/using-kafka-handler.html) +about how to set up an Ogg Kafka handler to synchronize changelog to Kafka topics.* + + +How to use Ogg format +---------------- + +Ogg provides a unified format for changelog, here is a simple example for an update operation +captured from an Oracle `PRODUCTS` table in JSON format: + +```json +{ + "before": { + "id": 111, + "name": "scooter", + "description": "Big 2-wheel scooter", + "weight": 5.18 + }, + "after": { + "id": 111, + "name": "scooter", + "description": "Big 2-wheel scooter", + "weight": 5.15 + }, + "op_type": "U", + "op_ts": "2020-05-13 15:40:06.000000", + "current_ts": "2020-05-13 15:40:07.000000", + "primary_keys": [ + "id" + ], + "pos": "00000000000000000000143", + "table": "PRODUCTS" +} +``` + +*Note: please refer +to [Debezium documentation](https://debezium.io/documentation/reference/1.3/connectors/mysql.html#mysql-connector-events_debezium) Review comment: hint: please update the link ########## File path: docs/content/docs/connectors/table/formats/ogg.md ########## @@ -0,0 +1,270 @@ +--- +title: Ogg +weight: 8 +type: docs +aliases: + - /dev/table/connectors/formats/ogg.html +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Ogg Format + +{{< label "Changelog-Data-Capture Format" >}} {{< label "Format: Serialization Schema" >}} {{< +label "Format: Deserialization Schema" >}} + +[Oracle GoldenGate](https://www.oracle.com/integration/goldengate//) (a.k.a ogg) is a managed service Review comment: ```suggestion [Oracle GoldenGate](https://www.oracle.com/integration/goldengate/) (a.k.a ogg) is a managed service ``` ########## File path: docs/content.zh/docs/connectors/table/formats/ogg.md ########## @@ -0,0 +1,256 @@ +--- +title: Ogg +weight: 8 +type: docs +aliases: +- /dev/table/connectors/formats/ogg.html +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Ogg Format + +{{< label "Changelog-Data-Capture Format" >}} {{< label "Format: Serialization Schema" >}} {{< +label "Format: Deserialization Schema" >}} + +[Oracle GoldenGate](https://www.oracle.com/integration/goldengate//) (a.k.a ogg) 是一个实现异构 IT 环境间数据实时数据集成和复制的综合软件包。 +该产品集支持高可用性解决方案、实时数据集成、事务更改数据捕获、运营和分析企业系统之间的数据复制、转换和验证。Ogg 为变更日志提供了统一的格式结构,并支持使用 JSON 序列化消息。 + +Flink 支持将 Ogg JSON 消息解析为 INSERT/UPDATE/DELETE 消息到 Flink SQL 系统中。在很多情况下,利用这个特性非常有用,例如 + +- 将增量数据从数据库同步到其他系统 +- 日志审计 +- 数据库的实时物化视图 +- 关联维度数据库的变更历史,等等 + +Flink 还支持将 Flink SQL 中的 INSERT/UPDATE/DELETE 消息编码为 Ogg JSON 格式的消息, 输出到 Kafka 等存储中。 +但需要注意, 目前 Flink 还不支持将 UPDATE_BEFORE 和 UPDATE_AFTER 合并为一条 UPDATE 消息. 因此, Flink 将 UPDATE_BEFORE 和 UPDATE_AFTER +分别编码为 DELETE 和 INSERT 类型的 Ogg 消息。 + +Dependencies +------------ + +#### Ogg Json + +{{< sql_download_table "ogg-json" >}} + +*注意: 请参考 [Ogg Kafka Handler documentation](https://docs.oracle.com/en/middleware/goldengate/big-data/19.1/gadbd/using-kafka-handler.html), +了解如何设置 Ogg Kafka handler 来将变更日志同步到 Kafka 的 Topic。* + + +How to use Ogg format +---------------- + +Ogg 为变更日志提供了统一的格式, 这是一个 JSON 格式的从 Oracle `PRODUCTS` 表捕获的更新操作的简单示例: + +```json +{ + "before": { + "id": 111, + "name": "scooter", + "description": "Big 2-wheel scooter", + "weight": 5.18 + }, + "after": { + "id": 111, + "name": "scooter", + "description": "Big 2-wheel scooter", + "weight": 5.15 + }, + "op_type": "U", + "op_ts": "2020-05-13 15:40:06.000000", + "current_ts": "2020-05-13 15:40:07.000000", + "primary_keys": [ + "id" + ], + "pos": "00000000000000000000143", + "table": "PRODUCTS" +} +``` + +*注意:请参考 [Debezium documentation](https://debezium.io/documentation/reference/1.3/connectors/mysql.html#mysql-connector-events_debezium) +了解每个字段的含义.* + +Oracle `PRODUCTS` 表 有 4 列 (`id`, `name`, `description` and `weight`). 上面的 JSON 消息是 `PRODUCTS` 表上的一条更新事件,其中 `id = 111` 的行的 +`weight` 值从 `5.18` 更改为 `5.15`. 假设此消息已同步到 Kafka 的 Topic `products_ogg`, 则可以使用以下 DDL 来使用该 Topic 并解析更新事件。 + +```sql +CREATE TABLE topic_products ( + -- schema is totally the same to the MySQL "products" table Review comment: ```suggestion -- schema is totally the same to the Oracle "products" table ``` ########## File path: flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/ogg/OggJsonDecodingFormat.java ########## @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.json.ogg; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.formats.common.TimestampFormat; +import org.apache.flink.formats.json.ogg.OggJsonDeserializationSchema.MetadataConverter; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.format.DecodingFormat; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.utils.DataTypeUtils; +import org.apache.flink.table.utils.DateTimeUtils; +import org.apache.flink.types.RowKind; + +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** {@link DecodingFormat} for Ogg using JSON encoding. */ +public class OggJsonDecodingFormat implements DecodingFormat<DeserializationSchema<RowData>> { + + // -------------------------------------------------------------------------------------------- + // Mutable attributes + // -------------------------------------------------------------------------------------------- + + private List<String> metadataKeys; + + // -------------------------------------------------------------------------------------------- + // Ogg-specific attributes + // -------------------------------------------------------------------------------------------- + + private final boolean ignoreParseErrors; + private final TimestampFormat timestampFormat; + + public OggJsonDecodingFormat(boolean ignoreParseErrors, TimestampFormat timestampFormat) { + this.ignoreParseErrors = ignoreParseErrors; + this.timestampFormat = timestampFormat; + this.metadataKeys = Collections.emptyList(); + } + + @Override + public DeserializationSchema<RowData> createRuntimeDecoder( + DynamicTableSource.Context context, DataType physicalDataType) { + + final List<ReadableMetadata> readableMetadata = + metadataKeys.stream() + .map( + k -> + Stream.of(ReadableMetadata.values()) + .filter(rm -> rm.key.equals(k)) + .findFirst() + .orElseThrow(IllegalStateException::new)) + .collect(Collectors.toList()); + + final List<DataTypes.Field> metadataFields = + readableMetadata.stream() + .map(m -> DataTypes.FIELD(m.key, m.dataType)) + .collect(Collectors.toList()); + + final DataType producedDataType = + DataTypeUtils.appendRowFields(physicalDataType, metadataFields); + + final TypeInformation<RowData> producedTypeInfo = + context.createTypeInformation(producedDataType); + + return new OggJsonDeserializationSchema( + physicalDataType, + readableMetadata, + producedTypeInfo, + ignoreParseErrors, + timestampFormat); + } + + // -------------------------------------------------------------------------------------------- + // Metadata handling + // -------------------------------------------------------------------------------------------- + + @Override + public Map<String, DataType> listReadableMetadata() { + final Map<String, DataType> metadataMap = new LinkedHashMap<>(); + Stream.of(ReadableMetadata.values()) + .forEachOrdered(m -> metadataMap.put(m.key, m.dataType)); + return metadataMap; + } + + @Override + public void applyReadableMetadata(List<String> metadataKeys) { + this.metadataKeys = metadataKeys; + } + + @Override + public ChangelogMode getChangelogMode() { + return ChangelogMode.newBuilder() + .addContainedKind(RowKind.INSERT) + .addContainedKind(RowKind.UPDATE_BEFORE) + .addContainedKind(RowKind.UPDATE_AFTER) + .addContainedKind(RowKind.DELETE) + .build(); + } + + /** List of metadata that can be read with this format. */ + enum ReadableMetadata { + TABLE( + "table", + DataTypes.STRING().nullable(), + DataTypes.FIELD("table", DataTypes.STRING()), + new MetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(GenericRowData row, int pos) { + return row.getString(pos); + } + }), + + PRIMARY_KEYS( + "primary_keys", Review comment: ```suggestion "primary-keys", ``` We use '-' in Flink instead of '_', like `ingestion-timestamp` and `event-timestamp` ########## File path: docs/content/docs/connectors/table/formats/ogg.md ########## @@ -0,0 +1,270 @@ +--- +title: Ogg +weight: 8 +type: docs +aliases: + - /dev/table/connectors/formats/ogg.html +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Ogg Format + +{{< label "Changelog-Data-Capture Format" >}} {{< label "Format: Serialization Schema" >}} {{< +label "Format: Deserialization Schema" >}} + +[Oracle GoldenGate](https://www.oracle.com/integration/goldengate//) (a.k.a ogg) is a managed service +providing a real-time data mesh platform, which uses replication to keep data highly available, and +enabling real-time analysis. Customers can design, execute, and monitor their data replication and +stream data processing solutions without the need to allocate or manage compute environments. Ogg +provides a format schema for changelog and supports to serialize messages using JSON. + +Flink supports to interpret Ogg JSON as INSERT/UPDATE/DELETE messages into Flink SQL system. This is +useful in many cases to leverage this feature, such as + +- synchronizing incremental data from databases to other systems +- auditing logs +- real-time materialized views on databases +- temporal join changing history of a database table and so on. + +Flink also supports to encode the INSERT/UPDATE/DELETE messages in Flink SQL as Ogg JSON, and emit +to external systems like Kafka. However, currently Flink can't combine UPDATE_BEFORE and +UPDATE_AFTER into a single UPDATE message. Therefore, Flink encodes UPDATE_BEFORE and UPDATE_AFTER +as DELETE and INSERT Ogg messages. + +Dependencies +------------ + +#### Ogg Json + +{{< sql_download_table "ogg-json" >}} + +*Note: please refer +to [Ogg Kafka Handler documentation](https://docs.oracle.com/en/middleware/goldengate/big-data/19.1/gadbd/using-kafka-handler.html) +about how to set up an Ogg Kafka handler to synchronize changelog to Kafka topics.* + + +How to use Ogg format +---------------- + +Ogg provides a unified format for changelog, here is a simple example for an update operation +captured from an Oracle `PRODUCTS` table in JSON format: + +```json +{ + "before": { + "id": 111, + "name": "scooter", + "description": "Big 2-wheel scooter", + "weight": 5.18 + }, + "after": { + "id": 111, + "name": "scooter", + "description": "Big 2-wheel scooter", + "weight": 5.15 + }, + "op_type": "U", + "op_ts": "2020-05-13 15:40:06.000000", + "current_ts": "2020-05-13 15:40:07.000000", + "primary_keys": [ + "id" + ], + "pos": "00000000000000000000143", + "table": "PRODUCTS" +} +``` + +*Note: please refer +to [Debezium documentation](https://debezium.io/documentation/reference/1.3/connectors/mysql.html#mysql-connector-events_debezium) +about the meaning of each field.* + +The Oracle `PRODUCTS` table has 4 columns (`id`, `name`, `description` and `weight`). The above JSON +message is an update change event on the `PRODUCTS` table where the `weight` value of the row +with `id = 111` is changed from `5.18` to `5.15`. Assuming this messages is synchronized to Kafka +topic `products_ogg`, then we can use the following DDL to consume this topic and interpret the +change events. + +```sql +CREATE TABLE topic_products ( + -- schema is totally the same to the MySQL "products" table Review comment: ```suggestion -- schema is totally the same to the Oracle "products" table ``` ########## File path: docs/content/docs/connectors/table/formats/ogg.md ########## @@ -0,0 +1,270 @@ +--- +title: Ogg +weight: 8 +type: docs +aliases: + - /dev/table/connectors/formats/ogg.html +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Ogg Format + +{{< label "Changelog-Data-Capture Format" >}} {{< label "Format: Serialization Schema" >}} {{< +label "Format: Deserialization Schema" >}} + +[Oracle GoldenGate](https://www.oracle.com/integration/goldengate//) (a.k.a ogg) is a managed service +providing a real-time data mesh platform, which uses replication to keep data highly available, and +enabling real-time analysis. Customers can design, execute, and monitor their data replication and +stream data processing solutions without the need to allocate or manage compute environments. Ogg +provides a format schema for changelog and supports to serialize messages using JSON. + +Flink supports to interpret Ogg JSON as INSERT/UPDATE/DELETE messages into Flink SQL system. This is +useful in many cases to leverage this feature, such as + +- synchronizing incremental data from databases to other systems +- auditing logs +- real-time materialized views on databases +- temporal join changing history of a database table and so on. + +Flink also supports to encode the INSERT/UPDATE/DELETE messages in Flink SQL as Ogg JSON, and emit +to external systems like Kafka. However, currently Flink can't combine UPDATE_BEFORE and +UPDATE_AFTER into a single UPDATE message. Therefore, Flink encodes UPDATE_BEFORE and UPDATE_AFTER +as DELETE and INSERT Ogg messages. + +Dependencies +------------ + +#### Ogg Json + +{{< sql_download_table "ogg-json" >}} + +*Note: please refer +to [Ogg Kafka Handler documentation](https://docs.oracle.com/en/middleware/goldengate/big-data/19.1/gadbd/using-kafka-handler.html) +about how to set up an Ogg Kafka handler to synchronize changelog to Kafka topics.* + + +How to use Ogg format +---------------- + +Ogg provides a unified format for changelog, here is a simple example for an update operation +captured from an Oracle `PRODUCTS` table in JSON format: + +```json +{ + "before": { + "id": 111, + "name": "scooter", + "description": "Big 2-wheel scooter", + "weight": 5.18 + }, + "after": { + "id": 111, + "name": "scooter", + "description": "Big 2-wheel scooter", + "weight": 5.15 + }, + "op_type": "U", + "op_ts": "2020-05-13 15:40:06.000000", + "current_ts": "2020-05-13 15:40:07.000000", + "primary_keys": [ + "id" + ], + "pos": "00000000000000000000143", + "table": "PRODUCTS" +} +``` + +*Note: please refer +to [Debezium documentation](https://debezium.io/documentation/reference/1.3/connectors/mysql.html#mysql-connector-events_debezium) +about the meaning of each field.* + +The Oracle `PRODUCTS` table has 4 columns (`id`, `name`, `description` and `weight`). The above JSON +message is an update change event on the `PRODUCTS` table where the `weight` value of the row +with `id = 111` is changed from `5.18` to `5.15`. Assuming this messages is synchronized to Kafka +topic `products_ogg`, then we can use the following DDL to consume this topic and interpret the +change events. + +```sql +CREATE TABLE topic_products ( + -- schema is totally the same to the MySQL "products" table + id BIGINT, + name STRING, + description STRING, + weight DECIMAL(10, 2) +) WITH ( + 'connector' = 'kafka', + 'topic' = 'products_ogg', + 'properties.bootstrap.servers' = 'localhost:9092', + 'properties.group.id' = 'testGroup', + 'format' = 'ogg-json' +) +``` + +After registering the topic as a Flink table, then you can consume the Ogg messages as a changelog +source. + +```sql +-- a real-time materialized view on the Oracle "PRODUCTS" +-- which calculate the latest average of weight for the same products +SELECT name, AVG(weight) +FROM topic_products +GROUP BY name; + +-- synchronize all the data and incremental changes of Oracle "PRODUCTS" table to +-- Elasticsearch "products" index for future searching +INSERT INTO elasticsearch_products +SELECT * +FROM topic_products; +``` + +Available Metadata +------------------ + +The following format metadata can be exposed as read-only (`VIRTUAL`) columns in a table definition. + +<span class="label label-danger">Attention</span> Format metadata fields are only available if the +corresponding connector forwards format metadata. Currently, only the Kafka connector is able to +expose metadata fields for its value format. + +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 25%">Key</th> + <th class="text-center" style="width: 40%">Data Type</th> + <th class="text-center" style="width: 40%">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td><code>table</code></td> + <td><code>STRING NULL </code></td> + <td>Contains fully qualified table name. The format of the fully qualified table name is: + CATALOG NAME.SCHEMA NAME.TABLE NAME</td> + </tr> + <tr> + <td><code>primary_keys</code></td> + <td><code>ARRAY<STRING> NULL</code></td> + <td>An array variable holding the column names of the primary keys of the source table. + The primary_keys field is only include in the JSON output if the includePrimaryKeys + configuration property is set to true.</td> + </tr> Review comment: ```suggestion <td><code>primary-keys</code></td> <td><code>ARRAY<STRING> NULL</code></td> <td>An array variable holding the column names of the primary keys of the source table. The primary-keys field is only include in the JSON output if the includePrimaryKeys configuration property is set to true.</td> </tr> ``` ########## File path: flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/ogg/OggJsonDeserializationSchema.java ########## @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.json.ogg; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.formats.common.TimestampFormat; +import org.apache.flink.formats.json.JsonRowDataDeserializationSchema; +import org.apache.flink.formats.json.ogg.OggJsonDecodingFormat.ReadableMetadata; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.utils.DataTypeUtils; +import org.apache.flink.types.RowKind; +import org.apache.flink.util.Collector; + +import java.io.IOException; +import java.io.Serializable; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +import static java.lang.String.format; + +/** + * Deserialization schema from Ogg JSON to Flink Table/SQL internal data structure {@link RowData}. + * The deserialization schema knows Ogg's schema definition and can extract the database data and + * convert into {@link RowData} with {@link RowKind}. + * + * <p>Deserializes a <code>byte[]</code> message as a JSON object and reads the specified fields. + * + * <p>Failures during deserialization are forwarded as wrapped IOExceptions. + * + * @see <a href="https://www.oracle.com/cn/middleware/technologies/goldengate/overview.html">Ogg</a> + */ +@Internal +public final class OggJsonDeserializationSchema implements DeserializationSchema<RowData> { + private static final long serialVersionUID = 1L; + + private static final String OP_CREATE = "I"; // insert + private static final String OP_UPDATE = "U"; // update + private static final String OP_DELETE = "D"; // delete + private static final String OP_TRUNCATE = "T"; // truncate + + private static final String REPLICA_IDENTITY_EXCEPTION = + "The \"before\" field of %s message is null, " + + "if you are using Ogg Postgres Connector, " + + "please check the Postgres table has been set REPLICA IDENTITY to FULL level."; + + /** The deserializer to deserialize Ogg JSON data. */ + private final JsonRowDataDeserializationSchema jsonDeserializer; + + /** Flag that indicates that an additional projection is required for metadata. */ + private final boolean hasMetadata; + + /** Metadata to be extracted for every record. */ + private final MetadataConverter[] metadataConverters; + + /** {@link TypeInformation} of the produced {@link RowData} (physical + metadata). */ + private final TypeInformation<RowData> producedTypeInfo; + + /** Flag indicating whether to ignore invalid fields/rows (default: throw an exception). */ + private final boolean ignoreParseErrors; + + public OggJsonDeserializationSchema( + DataType physicalDataType, + List<ReadableMetadata> requestedMetadata, + TypeInformation<RowData> producedTypeInfo, + boolean ignoreParseErrors, + TimestampFormat timestampFormat) { + final RowType jsonRowType = createJsonRowType(physicalDataType, requestedMetadata); + this.jsonDeserializer = + new JsonRowDataDeserializationSchema( + jsonRowType, + // the result type is never used, so it's fine to pass in the produced type + // info + producedTypeInfo, + false, // ignoreParseErrors already contains the functionality of + // failOnMissingField + ignoreParseErrors, + timestampFormat); + this.hasMetadata = requestedMetadata.size() > 0; + this.metadataConverters = createMetadataConverters(jsonRowType, requestedMetadata); + this.producedTypeInfo = producedTypeInfo; + this.ignoreParseErrors = ignoreParseErrors; + } + + private static RowType createJsonRowType( + DataType physicalDataType, List<ReadableMetadata> readableMetadata) { + DataType root = + DataTypes.ROW( + DataTypes.FIELD("before", physicalDataType), + DataTypes.FIELD("after", physicalDataType), + DataTypes.FIELD("op_type", DataTypes.STRING())); + // append fields that are required for reading metadata in the root + final List<DataTypes.Field> rootMetadataFields = + readableMetadata.stream() + .map(m -> m.requiredJsonField) + .distinct() + .collect(Collectors.toList()); + return (RowType) DataTypeUtils.appendRowFields(root, rootMetadataFields).getLogicalType(); + } + + private static MetadataConverter[] createMetadataConverters( + RowType jsonRowType, List<ReadableMetadata> requestedMetadata) { + return requestedMetadata.stream() + .map(m -> convertInRoot(jsonRowType, m)) + .toArray(MetadataConverter[]::new); + } + + private static MetadataConverter convertInRoot(RowType jsonRowType, ReadableMetadata metadata) { + final int pos = findFieldPos(metadata, jsonRowType); + return new MetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(GenericRowData root, int unused) { + return metadata.converter.convert(root, pos); + } + }; + } + + private static int findFieldPos(ReadableMetadata metadata, RowType jsonRowType) { + return jsonRowType.getFieldNames().indexOf(metadata.requiredJsonField.getName()); + } + + @Override + public RowData deserialize(byte[] message) { + throw new RuntimeException( + "Please invoke DeserializationSchema#deserialize(byte[], Collector<RowData>) instead."); + } + + @Override + public void deserialize(byte[] message, Collector<RowData> out) throws IOException { + if (message == null || message.length == 0) { + // skip tombstone messages + return; + } + try { + GenericRowData row = (GenericRowData) jsonDeserializer.deserialize(message); + + GenericRowData before = (GenericRowData) row.getField(0); + GenericRowData after = (GenericRowData) row.getField(1); + String op = row.getField(2).toString(); + if (OP_CREATE.equals(op)) { + after.setRowKind(RowKind.INSERT); + emitRow(row, after, out); + } else if (OP_UPDATE.equals(op)) { + if (before == null) { + throw new IllegalStateException( + String.format(REPLICA_IDENTITY_EXCEPTION, "UPDATE")); + } + before.setRowKind(RowKind.UPDATE_BEFORE); + after.setRowKind(RowKind.UPDATE_AFTER); + emitRow(row, before, out); + emitRow(row, after, out); + } else if (OP_DELETE.equals(op)) { + if (before == null) { + throw new IllegalStateException( + String.format(REPLICA_IDENTITY_EXCEPTION, "DELETE")); + } + before.setRowKind(RowKind.DELETE); + emitRow(row, before, out); + } else if (OP_TRUNCATE.equals(op)) { + // flink can't interpret such type of record, skip the record Review comment: print a WARN LOG ? ########## File path: docs/content.zh/docs/connectors/table/formats/ogg.md ########## @@ -0,0 +1,256 @@ +--- +title: Ogg +weight: 8 +type: docs +aliases: +- /dev/table/connectors/formats/ogg.html +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Ogg Format + +{{< label "Changelog-Data-Capture Format" >}} {{< label "Format: Serialization Schema" >}} {{< +label "Format: Deserialization Schema" >}} + +[Oracle GoldenGate](https://www.oracle.com/integration/goldengate//) (a.k.a ogg) 是一个实现异构 IT 环境间数据实时数据集成和复制的综合软件包。 Review comment: ```suggestion [Oracle GoldenGate](https://www.oracle.com/integration/goldengate/) (a.k.a ogg) 是一个实现异构 IT 环境间数据实时数据集成和复制的综合软件包。 ``` ########## File path: docs/content/docs/connectors/table/formats/ogg.md ########## @@ -0,0 +1,270 @@ +--- +title: Ogg +weight: 8 +type: docs +aliases: + - /dev/table/connectors/formats/ogg.html +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Ogg Format + +{{< label "Changelog-Data-Capture Format" >}} {{< label "Format: Serialization Schema" >}} {{< +label "Format: Deserialization Schema" >}} + +[Oracle GoldenGate](https://www.oracle.com/integration/goldengate//) (a.k.a ogg) is a managed service +providing a real-time data mesh platform, which uses replication to keep data highly available, and +enabling real-time analysis. Customers can design, execute, and monitor their data replication and +stream data processing solutions without the need to allocate or manage compute environments. Ogg +provides a format schema for changelog and supports to serialize messages using JSON. + +Flink supports to interpret Ogg JSON as INSERT/UPDATE/DELETE messages into Flink SQL system. This is +useful in many cases to leverage this feature, such as + +- synchronizing incremental data from databases to other systems +- auditing logs +- real-time materialized views on databases +- temporal join changing history of a database table and so on. + +Flink also supports to encode the INSERT/UPDATE/DELETE messages in Flink SQL as Ogg JSON, and emit +to external systems like Kafka. However, currently Flink can't combine UPDATE_BEFORE and +UPDATE_AFTER into a single UPDATE message. Therefore, Flink encodes UPDATE_BEFORE and UPDATE_AFTER +as DELETE and INSERT Ogg messages. + +Dependencies +------------ + +#### Ogg Json + +{{< sql_download_table "ogg-json" >}} + +*Note: please refer +to [Ogg Kafka Handler documentation](https://docs.oracle.com/en/middleware/goldengate/big-data/19.1/gadbd/using-kafka-handler.html) +about how to set up an Ogg Kafka handler to synchronize changelog to Kafka topics.* + + +How to use Ogg format +---------------- + +Ogg provides a unified format for changelog, here is a simple example for an update operation +captured from an Oracle `PRODUCTS` table in JSON format: + +```json +{ + "before": { + "id": 111, + "name": "scooter", + "description": "Big 2-wheel scooter", + "weight": 5.18 + }, + "after": { + "id": 111, + "name": "scooter", + "description": "Big 2-wheel scooter", + "weight": 5.15 + }, + "op_type": "U", + "op_ts": "2020-05-13 15:40:06.000000", + "current_ts": "2020-05-13 15:40:07.000000", + "primary_keys": [ + "id" + ], + "pos": "00000000000000000000143", + "table": "PRODUCTS" +} +``` + +*Note: please refer +to [Debezium documentation](https://debezium.io/documentation/reference/1.3/connectors/mysql.html#mysql-connector-events_debezium) +about the meaning of each field.* + +The Oracle `PRODUCTS` table has 4 columns (`id`, `name`, `description` and `weight`). The above JSON +message is an update change event on the `PRODUCTS` table where the `weight` value of the row +with `id = 111` is changed from `5.18` to `5.15`. Assuming this messages is synchronized to Kafka +topic `products_ogg`, then we can use the following DDL to consume this topic and interpret the +change events. + +```sql +CREATE TABLE topic_products ( + -- schema is totally the same to the MySQL "products" table + id BIGINT, + name STRING, + description STRING, + weight DECIMAL(10, 2) +) WITH ( + 'connector' = 'kafka', + 'topic' = 'products_ogg', + 'properties.bootstrap.servers' = 'localhost:9092', + 'properties.group.id' = 'testGroup', + 'format' = 'ogg-json' +) +``` + +After registering the topic as a Flink table, then you can consume the Ogg messages as a changelog +source. + +```sql +-- a real-time materialized view on the Oracle "PRODUCTS" +-- which calculate the latest average of weight for the same products +SELECT name, AVG(weight) +FROM topic_products +GROUP BY name; + +-- synchronize all the data and incremental changes of Oracle "PRODUCTS" table to +-- Elasticsearch "products" index for future searching +INSERT INTO elasticsearch_products +SELECT * +FROM topic_products; +``` + +Available Metadata +------------------ + +The following format metadata can be exposed as read-only (`VIRTUAL`) columns in a table definition. + +<span class="label label-danger">Attention</span> Format metadata fields are only available if the +corresponding connector forwards format metadata. Currently, only the Kafka connector is able to +expose metadata fields for its value format. + +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 25%">Key</th> + <th class="text-center" style="width: 40%">Data Type</th> + <th class="text-center" style="width: 40%">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td><code>table</code></td> + <td><code>STRING NULL </code></td> + <td>Contains fully qualified table name. The format of the fully qualified table name is: + CATALOG NAME.SCHEMA NAME.TABLE NAME</td> + </tr> + <tr> + <td><code>primary_keys</code></td> + <td><code>ARRAY<STRING> NULL</code></td> + <td>An array variable holding the column names of the primary keys of the source table. + The primary_keys field is only include in the JSON output if the includePrimaryKeys + configuration property is set to true.</td> + </tr> + <tr> + <td><code>ingestion-timestamp</code></td> + <td><code>TIMESTAMP_LTZ(6) NULL</code></td> + <td>The timestamp at which the connector processed the event. Corresponds to the current_ts field in the Ogg record.</td> + </tr> + <tr> + <td><code>event-timestamp</code></td> + <td><code>TIMESTAMP_LTZ(6) NULL</code></td> + <td>The timestamp at which the source system created the event. Corresponds to the op_ts field in the Ogg record.</td> + </tr> + </tbody> +</table> + +The following example shows how to access Ogg metadata fields in Kafka: + +```sql +CREATE TABLE KafkaTable ( + origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL, + event_time TIMESTAMP(3) METADATA FROM 'value.event-timestamp' VIRTUAL, + origin_table STRING METADATA FROM 'value.table' VIRTUAL, + primary_keys ARRAY<STRING> METADATA FROM 'value.primary_keys' VIRTUAL, Review comment: ```suggestion primary_keys ARRAY<STRING> METADATA FROM 'value.primary-keys' VIRTUAL, ``` ########## File path: docs/content.zh/docs/connectors/table/formats/ogg.md ########## @@ -0,0 +1,256 @@ +--- +title: Ogg +weight: 8 +type: docs +aliases: +- /dev/table/connectors/formats/ogg.html +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Ogg Format + +{{< label "Changelog-Data-Capture Format" >}} {{< label "Format: Serialization Schema" >}} {{< +label "Format: Deserialization Schema" >}} + +[Oracle GoldenGate](https://www.oracle.com/integration/goldengate//) (a.k.a ogg) 是一个实现异构 IT 环境间数据实时数据集成和复制的综合软件包。 +该产品集支持高可用性解决方案、实时数据集成、事务更改数据捕获、运营和分析企业系统之间的数据复制、转换和验证。Ogg 为变更日志提供了统一的格式结构,并支持使用 JSON 序列化消息。 + +Flink 支持将 Ogg JSON 消息解析为 INSERT/UPDATE/DELETE 消息到 Flink SQL 系统中。在很多情况下,利用这个特性非常有用,例如 + +- 将增量数据从数据库同步到其他系统 +- 日志审计 +- 数据库的实时物化视图 +- 关联维度数据库的变更历史,等等 + +Flink 还支持将 Flink SQL 中的 INSERT/UPDATE/DELETE 消息编码为 Ogg JSON 格式的消息, 输出到 Kafka 等存储中。 +但需要注意, 目前 Flink 还不支持将 UPDATE_BEFORE 和 UPDATE_AFTER 合并为一条 UPDATE 消息. 因此, Flink 将 UPDATE_BEFORE 和 UPDATE_AFTER +分别编码为 DELETE 和 INSERT 类型的 Ogg 消息。 + +Dependencies +------------ + +#### Ogg Json + +{{< sql_download_table "ogg-json" >}} + +*注意: 请参考 [Ogg Kafka Handler documentation](https://docs.oracle.com/en/middleware/goldengate/big-data/19.1/gadbd/using-kafka-handler.html), +了解如何设置 Ogg Kafka handler 来将变更日志同步到 Kafka 的 Topic。* + + +How to use Ogg format +---------------- + +Ogg 为变更日志提供了统一的格式, 这是一个 JSON 格式的从 Oracle `PRODUCTS` 表捕获的更新操作的简单示例: + +```json +{ + "before": { + "id": 111, + "name": "scooter", + "description": "Big 2-wheel scooter", + "weight": 5.18 + }, + "after": { + "id": 111, + "name": "scooter", + "description": "Big 2-wheel scooter", + "weight": 5.15 + }, + "op_type": "U", + "op_ts": "2020-05-13 15:40:06.000000", + "current_ts": "2020-05-13 15:40:07.000000", + "primary_keys": [ + "id" + ], + "pos": "00000000000000000000143", + "table": "PRODUCTS" +} +``` + +*注意:请参考 [Debezium documentation](https://debezium.io/documentation/reference/1.3/connectors/mysql.html#mysql-connector-events_debezium) +了解每个字段的含义.* + +Oracle `PRODUCTS` 表 有 4 列 (`id`, `name`, `description` and `weight`). 上面的 JSON 消息是 `PRODUCTS` 表上的一条更新事件,其中 `id = 111` 的行的 +`weight` 值从 `5.18` 更改为 `5.15`. 假设此消息已同步到 Kafka 的 Topic `products_ogg`, 则可以使用以下 DDL 来使用该 Topic 并解析更新事件。 + +```sql +CREATE TABLE topic_products ( + -- schema is totally the same to the MySQL "products" table + id BIGINT, + name STRING, + description STRING, + weight DECIMAL(10, 2) +) WITH ( + 'connector' = 'kafka', + 'topic' = 'products_ogg', + 'properties.bootstrap.servers' = 'localhost:9092', + 'properties.group.id' = 'testGroup', + 'format' = 'ogg-json' +) +``` + +再将 Kafka Topic 注册为 Flink 表之后, 可以将 OGG 消息变为变更日志源。 + +```sql +-- a real-time materialized view on the Oracle "PRODUCTS" +-- which calculate the latest average of weight for the same products +SELECT name, AVG(weight) +FROM topic_products +GROUP BY name; + +-- synchronize all the data and incremental changes of Oracle "PRODUCTS" table to +-- Elasticsearch "products" index for future searching +INSERT INTO elasticsearch_products +SELECT * +FROM topic_products; +``` + +Available Metadata +------------------ + +The following format metadata can be exposed as read-only (`VIRTUAL`) columns in a table definition. + +<span class="label label-danger">Attention</span> Format metadata fields are only available if the +corresponding connector forwards format metadata. Currently, only the Kafka connector is able to +expose metadata fields for its value format. + +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 25%">Key</th> + <th class="text-center" style="width: 40%">Data Type</th> + <th class="text-center" style="width: 40%">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td><code>table</code></td> + <td><code>STRING NULL </code></td> + <td>Contains fully qualified table name. The format of the fully qualified table name is: + CATALOG NAME.SCHEMA NAME.TABLE NAME</td> + </tr> + <tr> + <td><code>primary_keys</code></td> + <td><code>ARRAY<STRING> NULL</code></td> + <td>An array variable holding the column names of the primary keys of the source table. + The primary_keys field is only include in the JSON output if the includePrimaryKeys + configuration property is set to true.</td> + </tr> + <tr> + <td><code>ingestion-timestamp</code></td> + <td><code>TIMESTAMP_LTZ(6) NULL</code></td> + <td>The timestamp at which the connector processed the event. Corresponds to the current_ts field in the Ogg record.</td> + </tr> + <tr> + <td><code>event-timestamp</code></td> + <td><code>TIMESTAMP_LTZ(6) NULL</code></td> + <td>The timestamp at which the source system created the event. Corresponds to the op_ts field in the Ogg record.</td> + </tr> + </tbody> +</table> + +The following example shows how to access Ogg metadata fields in Kafka: + +```sql +CREATE TABLE KafkaTable ( + origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL, + event_time TIMESTAMP(3) METADATA FROM 'value.event-timestamp' VIRTUAL, + origin_table STRING METADATA FROM 'value.table' VIRTUAL, + primary_keys ARRAY<STRING> METADATA FROM 'value.primary_keys' VIRTUAL, Review comment: primary-keys ########## File path: docs/content.zh/docs/connectors/table/formats/ogg.md ########## @@ -0,0 +1,256 @@ +--- +title: Ogg +weight: 8 +type: docs +aliases: +- /dev/table/connectors/formats/ogg.html +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Ogg Format + +{{< label "Changelog-Data-Capture Format" >}} {{< label "Format: Serialization Schema" >}} {{< +label "Format: Deserialization Schema" >}} + +[Oracle GoldenGate](https://www.oracle.com/integration/goldengate//) (a.k.a ogg) 是一个实现异构 IT 环境间数据实时数据集成和复制的综合软件包。 +该产品集支持高可用性解决方案、实时数据集成、事务更改数据捕获、运营和分析企业系统之间的数据复制、转换和验证。Ogg 为变更日志提供了统一的格式结构,并支持使用 JSON 序列化消息。 + +Flink 支持将 Ogg JSON 消息解析为 INSERT/UPDATE/DELETE 消息到 Flink SQL 系统中。在很多情况下,利用这个特性非常有用,例如 + +- 将增量数据从数据库同步到其他系统 +- 日志审计 +- 数据库的实时物化视图 +- 关联维度数据库的变更历史,等等 + +Flink 还支持将 Flink SQL 中的 INSERT/UPDATE/DELETE 消息编码为 Ogg JSON 格式的消息, 输出到 Kafka 等存储中。 +但需要注意, 目前 Flink 还不支持将 UPDATE_BEFORE 和 UPDATE_AFTER 合并为一条 UPDATE 消息. 因此, Flink 将 UPDATE_BEFORE 和 UPDATE_AFTER +分别编码为 DELETE 和 INSERT 类型的 Ogg 消息。 + +Dependencies +------------ + +#### Ogg Json + +{{< sql_download_table "ogg-json" >}} + +*注意: 请参考 [Ogg Kafka Handler documentation](https://docs.oracle.com/en/middleware/goldengate/big-data/19.1/gadbd/using-kafka-handler.html), +了解如何设置 Ogg Kafka handler 来将变更日志同步到 Kafka 的 Topic。* + + +How to use Ogg format +---------------- + +Ogg 为变更日志提供了统一的格式, 这是一个 JSON 格式的从 Oracle `PRODUCTS` 表捕获的更新操作的简单示例: + +```json +{ + "before": { + "id": 111, + "name": "scooter", + "description": "Big 2-wheel scooter", + "weight": 5.18 + }, + "after": { + "id": 111, + "name": "scooter", + "description": "Big 2-wheel scooter", + "weight": 5.15 + }, + "op_type": "U", + "op_ts": "2020-05-13 15:40:06.000000", + "current_ts": "2020-05-13 15:40:07.000000", + "primary_keys": [ + "id" + ], + "pos": "00000000000000000000143", + "table": "PRODUCTS" +} +``` + +*注意:请参考 [Debezium documentation](https://debezium.io/documentation/reference/1.3/connectors/mysql.html#mysql-connector-events_debezium) +了解每个字段的含义.* + +Oracle `PRODUCTS` 表 有 4 列 (`id`, `name`, `description` and `weight`). 上面的 JSON 消息是 `PRODUCTS` 表上的一条更新事件,其中 `id = 111` 的行的 +`weight` 值从 `5.18` 更改为 `5.15`. 假设此消息已同步到 Kafka 的 Topic `products_ogg`, 则可以使用以下 DDL 来使用该 Topic 并解析更新事件。 + +```sql +CREATE TABLE topic_products ( + -- schema is totally the same to the MySQL "products" table + id BIGINT, + name STRING, + description STRING, + weight DECIMAL(10, 2) +) WITH ( + 'connector' = 'kafka', + 'topic' = 'products_ogg', + 'properties.bootstrap.servers' = 'localhost:9092', + 'properties.group.id' = 'testGroup', + 'format' = 'ogg-json' +) +``` + +再将 Kafka Topic 注册为 Flink 表之后, 可以将 OGG 消息变为变更日志源。 + +```sql +-- a real-time materialized view on the Oracle "PRODUCTS" +-- which calculate the latest average of weight for the same products +SELECT name, AVG(weight) +FROM topic_products +GROUP BY name; + +-- synchronize all the data and incremental changes of Oracle "PRODUCTS" table to +-- Elasticsearch "products" index for future searching +INSERT INTO elasticsearch_products +SELECT * +FROM topic_products; +``` + +Available Metadata +------------------ + +The following format metadata can be exposed as read-only (`VIRTUAL`) columns in a table definition. + +<span class="label label-danger">Attention</span> Format metadata fields are only available if the +corresponding connector forwards format metadata. Currently, only the Kafka connector is able to +expose metadata fields for its value format. + +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 25%">Key</th> + <th class="text-center" style="width: 40%">Data Type</th> + <th class="text-center" style="width: 40%">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td><code>table</code></td> + <td><code>STRING NULL </code></td> + <td>Contains fully qualified table name. The format of the fully qualified table name is: + CATALOG NAME.SCHEMA NAME.TABLE NAME</td> + </tr> + <tr> + <td><code>primary_keys</code></td> + <td><code>ARRAY<STRING> NULL</code></td> + <td>An array variable holding the column names of the primary keys of the source table. + The primary_keys field is only include in the JSON output if the includePrimaryKeys + configuration property is set to true.</td> Review comment: ```suggestion <td><code>primary-keys</code></td> <td><code>ARRAY<STRING> NULL</code></td> <td>An array variable holding the column names of the primary keys of the source table. The primary-keys field is only include in the JSON output if the includePrimaryKeys configuration property is set to true.</td> ``` ########## File path: docs/content.zh/docs/connectors/table/formats/ogg.md ########## @@ -0,0 +1,256 @@ +--- +title: Ogg +weight: 8 +type: docs +aliases: +- /dev/table/connectors/formats/ogg.html +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Ogg Format + +{{< label "Changelog-Data-Capture Format" >}} {{< label "Format: Serialization Schema" >}} {{< +label "Format: Deserialization Schema" >}} + +[Oracle GoldenGate](https://www.oracle.com/integration/goldengate//) (a.k.a ogg) 是一个实现异构 IT 环境间数据实时数据集成和复制的综合软件包。 +该产品集支持高可用性解决方案、实时数据集成、事务更改数据捕获、运营和分析企业系统之间的数据复制、转换和验证。Ogg 为变更日志提供了统一的格式结构,并支持使用 JSON 序列化消息。 + +Flink 支持将 Ogg JSON 消息解析为 INSERT/UPDATE/DELETE 消息到 Flink SQL 系统中。在很多情况下,利用这个特性非常有用,例如 + +- 将增量数据从数据库同步到其他系统 +- 日志审计 +- 数据库的实时物化视图 +- 关联维度数据库的变更历史,等等 + +Flink 还支持将 Flink SQL 中的 INSERT/UPDATE/DELETE 消息编码为 Ogg JSON 格式的消息, 输出到 Kafka 等存储中。 +但需要注意, 目前 Flink 还不支持将 UPDATE_BEFORE 和 UPDATE_AFTER 合并为一条 UPDATE 消息. 因此, Flink 将 UPDATE_BEFORE 和 UPDATE_AFTER +分别编码为 DELETE 和 INSERT 类型的 Ogg 消息。 + +Dependencies +------------ + +#### Ogg Json + +{{< sql_download_table "ogg-json" >}} + +*注意: 请参考 [Ogg Kafka Handler documentation](https://docs.oracle.com/en/middleware/goldengate/big-data/19.1/gadbd/using-kafka-handler.html), +了解如何设置 Ogg Kafka handler 来将变更日志同步到 Kafka 的 Topic。* + + +How to use Ogg format +---------------- + +Ogg 为变更日志提供了统一的格式, 这是一个 JSON 格式的从 Oracle `PRODUCTS` 表捕获的更新操作的简单示例: + +```json +{ + "before": { + "id": 111, + "name": "scooter", + "description": "Big 2-wheel scooter", + "weight": 5.18 + }, + "after": { + "id": 111, + "name": "scooter", + "description": "Big 2-wheel scooter", + "weight": 5.15 + }, + "op_type": "U", + "op_ts": "2020-05-13 15:40:06.000000", + "current_ts": "2020-05-13 15:40:07.000000", + "primary_keys": [ + "id" + ], + "pos": "00000000000000000000143", + "table": "PRODUCTS" +} +``` + +*注意:请参考 [Debezium documentation](https://debezium.io/documentation/reference/1.3/connectors/mysql.html#mysql-connector-events_debezium) Review comment: the link is for MySQL ########## File path: docs/content.zh/docs/connectors/table/formats/ogg.md ########## @@ -0,0 +1,256 @@ +--- +title: Ogg +weight: 8 +type: docs +aliases: +- /dev/table/connectors/formats/ogg.html +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Ogg Format + +{{< label "Changelog-Data-Capture Format" >}} {{< label "Format: Serialization Schema" >}} {{< +label "Format: Deserialization Schema" >}} + +[Oracle GoldenGate](https://www.oracle.com/integration/goldengate//) (a.k.a ogg) 是一个实现异构 IT 环境间数据实时数据集成和复制的综合软件包。 +该产品集支持高可用性解决方案、实时数据集成、事务更改数据捕获、运营和分析企业系统之间的数据复制、转换和验证。Ogg 为变更日志提供了统一的格式结构,并支持使用 JSON 序列化消息。 + +Flink 支持将 Ogg JSON 消息解析为 INSERT/UPDATE/DELETE 消息到 Flink SQL 系统中。在很多情况下,利用这个特性非常有用,例如 + +- 将增量数据从数据库同步到其他系统 +- 日志审计 +- 数据库的实时物化视图 +- 关联维度数据库的变更历史,等等 + +Flink 还支持将 Flink SQL 中的 INSERT/UPDATE/DELETE 消息编码为 Ogg JSON 格式的消息, 输出到 Kafka 等存储中。 +但需要注意, 目前 Flink 还不支持将 UPDATE_BEFORE 和 UPDATE_AFTER 合并为一条 UPDATE 消息. 因此, Flink 将 UPDATE_BEFORE 和 UPDATE_AFTER +分别编码为 DELETE 和 INSERT 类型的 Ogg 消息。 + +Dependencies +------------ + +#### Ogg Json + +{{< sql_download_table "ogg-json" >}} + +*注意: 请参考 [Ogg Kafka Handler documentation](https://docs.oracle.com/en/middleware/goldengate/big-data/19.1/gadbd/using-kafka-handler.html), +了解如何设置 Ogg Kafka handler 来将变更日志同步到 Kafka 的 Topic。* + + +How to use Ogg format +---------------- + +Ogg 为变更日志提供了统一的格式, 这是一个 JSON 格式的从 Oracle `PRODUCTS` 表捕获的更新操作的简单示例: + +```json +{ + "before": { + "id": 111, + "name": "scooter", + "description": "Big 2-wheel scooter", + "weight": 5.18 + }, + "after": { + "id": 111, + "name": "scooter", + "description": "Big 2-wheel scooter", + "weight": 5.15 + }, + "op_type": "U", + "op_ts": "2020-05-13 15:40:06.000000", + "current_ts": "2020-05-13 15:40:07.000000", + "primary_keys": [ + "id" + ], + "pos": "00000000000000000000143", + "table": "PRODUCTS" +} +``` + +*注意:请参考 [Debezium documentation](https://debezium.io/documentation/reference/1.3/connectors/mysql.html#mysql-connector-events_debezium) +了解每个字段的含义.* + +Oracle `PRODUCTS` 表 有 4 列 (`id`, `name`, `description` and `weight`). 上面的 JSON 消息是 `PRODUCTS` 表上的一条更新事件,其中 `id = 111` 的行的 +`weight` 值从 `5.18` 更改为 `5.15`. 假设此消息已同步到 Kafka 的 Topic `products_ogg`, 则可以使用以下 DDL 来使用该 Topic 并解析更新事件。 + +```sql +CREATE TABLE topic_products ( + -- schema is totally the same to the MySQL "products" table + id BIGINT, + name STRING, + description STRING, + weight DECIMAL(10, 2) +) WITH ( + 'connector' = 'kafka', + 'topic' = 'products_ogg', + 'properties.bootstrap.servers' = 'localhost:9092', + 'properties.group.id' = 'testGroup', + 'format' = 'ogg-json' +) +``` + +再将 Kafka Topic 注册为 Flink 表之后, 可以将 OGG 消息变为变更日志源。 + +```sql +-- a real-time materialized view on the Oracle "PRODUCTS" +-- which calculate the latest average of weight for the same products +SELECT name, AVG(weight) +FROM topic_products +GROUP BY name; + +-- synchronize all the data and incremental changes of Oracle "PRODUCTS" table to +-- Elasticsearch "products" index for future searching +INSERT INTO elasticsearch_products +SELECT * +FROM topic_products; +``` + +Available Metadata +------------------ + +The following format metadata can be exposed as read-only (`VIRTUAL`) columns in a table definition. + +<span class="label label-danger">Attention</span> Format metadata fields are only available if the +corresponding connector forwards format metadata. Currently, only the Kafka connector is able to +expose metadata fields for its value format. + +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 25%">Key</th> + <th class="text-center" style="width: 40%">Data Type</th> + <th class="text-center" style="width: 40%">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td><code>table</code></td> + <td><code>STRING NULL </code></td> + <td>Contains fully qualified table name. The format of the fully qualified table name is: + CATALOG NAME.SCHEMA NAME.TABLE NAME</td> + </tr> + <tr> + <td><code>primary_keys</code></td> + <td><code>ARRAY<STRING> NULL</code></td> + <td>An array variable holding the column names of the primary keys of the source table. + The primary_keys field is only include in the JSON output if the includePrimaryKeys + configuration property is set to true.</td> + </tr> + <tr> + <td><code>ingestion-timestamp</code></td> + <td><code>TIMESTAMP_LTZ(6) NULL</code></td> + <td>The timestamp at which the connector processed the event. Corresponds to the current_ts field in the Ogg record.</td> + </tr> + <tr> + <td><code>event-timestamp</code></td> + <td><code>TIMESTAMP_LTZ(6) NULL</code></td> + <td>The timestamp at which the source system created the event. Corresponds to the op_ts field in the Ogg record.</td> + </tr> + </tbody> +</table> + +The following example shows how to access Ogg metadata fields in Kafka: + +```sql +CREATE TABLE KafkaTable ( + origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL, + event_time TIMESTAMP(3) METADATA FROM 'value.event-timestamp' VIRTUAL, + origin_table STRING METADATA FROM 'value.table' VIRTUAL, + primary_keys ARRAY<STRING> METADATA FROM 'value.primary_keys' VIRTUAL, Review comment: primary-keys ########## File path: flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/ogg/OggJsonDeserializationSchema.java ########## @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.json.ogg; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.formats.common.TimestampFormat; +import org.apache.flink.formats.json.JsonRowDataDeserializationSchema; +import org.apache.flink.formats.json.ogg.OggJsonDecodingFormat.ReadableMetadata; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.utils.DataTypeUtils; +import org.apache.flink.types.RowKind; +import org.apache.flink.util.Collector; + +import java.io.IOException; +import java.io.Serializable; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +import static java.lang.String.format; + +/** + * Deserialization schema from Ogg JSON to Flink Table/SQL internal data structure {@link RowData}. + * The deserialization schema knows Ogg's schema definition and can extract the database data and + * convert into {@link RowData} with {@link RowKind}. + * + * <p>Deserializes a <code>byte[]</code> message as a JSON object and reads the specified fields. + * + * <p>Failures during deserialization are forwarded as wrapped IOExceptions. + * + * @see <a href="https://www.oracle.com/cn/middleware/technologies/goldengate/overview.html">Ogg</a> + */ +@Internal +public final class OggJsonDeserializationSchema implements DeserializationSchema<RowData> { + private static final long serialVersionUID = 1L; + + private static final String OP_CREATE = "I"; // insert + private static final String OP_UPDATE = "U"; // update + private static final String OP_DELETE = "D"; // delete + private static final String OP_TRUNCATE = "T"; // truncate + + private static final String REPLICA_IDENTITY_EXCEPTION = + "The \"before\" field of %s message is null, " + + "if you are using Ogg Postgres Connector, " + + "please check the Postgres table has been set REPLICA IDENTITY to FULL level."; + + /** The deserializer to deserialize Ogg JSON data. */ + private final JsonRowDataDeserializationSchema jsonDeserializer; + + /** Flag that indicates that an additional projection is required for metadata. */ + private final boolean hasMetadata; + + /** Metadata to be extracted for every record. */ + private final MetadataConverter[] metadataConverters; + + /** {@link TypeInformation} of the produced {@link RowData} (physical + metadata). */ + private final TypeInformation<RowData> producedTypeInfo; + + /** Flag indicating whether to ignore invalid fields/rows (default: throw an exception). */ + private final boolean ignoreParseErrors; + + public OggJsonDeserializationSchema( + DataType physicalDataType, + List<ReadableMetadata> requestedMetadata, + TypeInformation<RowData> producedTypeInfo, + boolean ignoreParseErrors, + TimestampFormat timestampFormat) { + final RowType jsonRowType = createJsonRowType(physicalDataType, requestedMetadata); + this.jsonDeserializer = + new JsonRowDataDeserializationSchema( + jsonRowType, + // the result type is never used, so it's fine to pass in the produced type + // info + producedTypeInfo, + false, // ignoreParseErrors already contains the functionality of + // failOnMissingField + ignoreParseErrors, + timestampFormat); + this.hasMetadata = requestedMetadata.size() > 0; + this.metadataConverters = createMetadataConverters(jsonRowType, requestedMetadata); + this.producedTypeInfo = producedTypeInfo; + this.ignoreParseErrors = ignoreParseErrors; + } + + private static RowType createJsonRowType( + DataType physicalDataType, List<ReadableMetadata> readableMetadata) { + DataType root = + DataTypes.ROW( + DataTypes.FIELD("before", physicalDataType), + DataTypes.FIELD("after", physicalDataType), + DataTypes.FIELD("op_type", DataTypes.STRING())); + // append fields that are required for reading metadata in the root + final List<DataTypes.Field> rootMetadataFields = + readableMetadata.stream() + .map(m -> m.requiredJsonField) + .distinct() + .collect(Collectors.toList()); + return (RowType) DataTypeUtils.appendRowFields(root, rootMetadataFields).getLogicalType(); + } + + private static MetadataConverter[] createMetadataConverters( + RowType jsonRowType, List<ReadableMetadata> requestedMetadata) { + return requestedMetadata.stream() + .map(m -> convertInRoot(jsonRowType, m)) + .toArray(MetadataConverter[]::new); + } + + private static MetadataConverter convertInRoot(RowType jsonRowType, ReadableMetadata metadata) { + final int pos = findFieldPos(metadata, jsonRowType); + return new MetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(GenericRowData root, int unused) { + return metadata.converter.convert(root, pos); + } + }; + } + + private static int findFieldPos(ReadableMetadata metadata, RowType jsonRowType) { + return jsonRowType.getFieldNames().indexOf(metadata.requiredJsonField.getName()); + } + + @Override + public RowData deserialize(byte[] message) { + throw new RuntimeException( + "Please invoke DeserializationSchema#deserialize(byte[], Collector<RowData>) instead."); + } + + @Override + public void deserialize(byte[] message, Collector<RowData> out) throws IOException { + if (message == null || message.length == 0) { + // skip tombstone messages + return; + } + try { + GenericRowData row = (GenericRowData) jsonDeserializer.deserialize(message); + + GenericRowData before = (GenericRowData) row.getField(0); + GenericRowData after = (GenericRowData) row.getField(1); + String op = row.getField(2).toString(); + if (OP_CREATE.equals(op)) { + after.setRowKind(RowKind.INSERT); + emitRow(row, after, out); + } else if (OP_UPDATE.equals(op)) { + if (before == null) { + throw new IllegalStateException( + String.format(REPLICA_IDENTITY_EXCEPTION, "UPDATE")); + } + before.setRowKind(RowKind.UPDATE_BEFORE); + after.setRowKind(RowKind.UPDATE_AFTER); + emitRow(row, before, out); + emitRow(row, after, out); + } else if (OP_DELETE.equals(op)) { + if (before == null) { + throw new IllegalStateException( + String.format(REPLICA_IDENTITY_EXCEPTION, "DELETE")); + } + before.setRowKind(RowKind.DELETE); + emitRow(row, before, out); + } else if (OP_TRUNCATE.equals(op)) { + // flink can't interpret such type of record, skip the record Review comment: print a WARN LOG ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
