leonardBang commented on a change in pull request #18738:
URL: https://github.com/apache/flink/pull/18738#discussion_r806704754



##########
File path: docs/content/docs/connectors/table/formats/ogg.md
##########
@@ -0,0 +1,270 @@
+---
+title: Ogg
+weight: 8
+type: docs
+aliases:
+  - /dev/table/connectors/formats/ogg.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Ogg Format
+
+{{< label "Changelog-Data-Capture Format" >}} {{< label "Format: Serialization 
Schema" >}} {{<
+label "Format: Deserialization Schema" >}}
+
+[Oracle GoldenGate](https://www.oracle.com/integration/goldengate//) (a.k.a 
ogg) is a managed service
+providing a real-time data mesh platform, which uses replication to keep data 
highly available, and
+enabling real-time analysis. Customers can design, execute, and monitor their 
data replication and
+stream data processing solutions without the need to allocate or manage 
compute environments. Ogg
+provides a format schema for changelog and supports to serialize messages 
using JSON.
+
+Flink supports to interpret Ogg JSON as INSERT/UPDATE/DELETE messages into 
Flink SQL system. This is
+useful in many cases to leverage this feature, such as
+
+- synchronizing incremental data from databases to other systems
+- auditing logs
+- real-time materialized views on databases
+- temporal join changing history of a database table and so on.
+
+Flink also supports to encode the INSERT/UPDATE/DELETE messages in Flink SQL 
as Ogg JSON, and emit
+to external systems like Kafka. However, currently Flink can't combine 
UPDATE_BEFORE and
+UPDATE_AFTER into a single UPDATE message. Therefore, Flink encodes 
UPDATE_BEFORE and UPDATE_AFTER
+as DELETE and INSERT Ogg messages.
+
+Dependencies
+------------
+
+#### Ogg Json
+
+{{< sql_download_table "ogg-json" >}}
+
+*Note: please refer
+to [Ogg Kafka Handler 
documentation](https://docs.oracle.com/en/middleware/goldengate/big-data/19.1/gadbd/using-kafka-handler.html)
+about how to set up an Ogg Kafka handler to synchronize changelog to Kafka 
topics.*
+
+
+How to use Ogg format
+----------------
+
+Ogg provides a unified format for changelog, here is a simple example for an 
update operation
+captured from an Oracle `PRODUCTS` table in JSON format:
+
+```json
+{
+  "before": {
+    "id": 111,
+    "name": "scooter",
+    "description": "Big 2-wheel scooter",
+    "weight": 5.18
+  },
+  "after": {
+    "id": 111,
+    "name": "scooter",
+    "description": "Big 2-wheel scooter",
+    "weight": 5.15
+  },
+  "op_type": "U",
+  "op_ts": "2020-05-13 15:40:06.000000",
+  "current_ts": "2020-05-13 15:40:07.000000",
+  "primary_keys": [
+    "id"
+  ],
+  "pos": "00000000000000000000143",
+  "table": "PRODUCTS"
+}
+```
+
+*Note: please refer
+to [Debezium 
documentation](https://debezium.io/documentation/reference/1.3/connectors/mysql.html#mysql-connector-events_debezium)

Review comment:
       hint: please update the link

##########
File path: docs/content/docs/connectors/table/formats/ogg.md
##########
@@ -0,0 +1,270 @@
+---
+title: Ogg
+weight: 8
+type: docs
+aliases:
+  - /dev/table/connectors/formats/ogg.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Ogg Format
+
+{{< label "Changelog-Data-Capture Format" >}} {{< label "Format: Serialization 
Schema" >}} {{<
+label "Format: Deserialization Schema" >}}
+
+[Oracle GoldenGate](https://www.oracle.com/integration/goldengate//) (a.k.a 
ogg) is a managed service

Review comment:
       ```suggestion
   [Oracle GoldenGate](https://www.oracle.com/integration/goldengate/) (a.k.a 
ogg) is a managed service
   ```

##########
File path: docs/content.zh/docs/connectors/table/formats/ogg.md
##########
@@ -0,0 +1,256 @@
+---
+title: Ogg
+weight: 8
+type: docs
+aliases:
+- /dev/table/connectors/formats/ogg.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Ogg Format
+
+{{< label "Changelog-Data-Capture Format" >}} {{< label "Format: Serialization 
Schema" >}} {{<
+label "Format: Deserialization Schema" >}}
+
+[Oracle GoldenGate](https://www.oracle.com/integration/goldengate//) (a.k.a 
ogg) 是一个实现异构 IT 环境间数据实时数据集成和复制的综合软件包。
+该产品集支持高可用性解决方案、实时数据集成、事务更改数据捕获、运营和分析企业系统之间的数据复制、转换和验证。Ogg 
为变更日志提供了统一的格式结构,并支持使用 JSON 序列化消息。
+
+Flink 支持将 Ogg JSON 消息解析为 INSERT/UPDATE/DELETE 消息到 Flink SQL 
系统中。在很多情况下,利用这个特性非常有用,例如
+
+- 将增量数据从数据库同步到其他系统
+- 日志审计
+- 数据库的实时物化视图
+- 关联维度数据库的变更历史,等等
+
+Flink 还支持将 Flink SQL 中的 INSERT/UPDATE/DELETE 消息编码为 Ogg JSON 格式的消息, 输出到 Kafka 
等存储中。
+但需要注意, 目前 Flink 还不支持将 UPDATE_BEFORE 和 UPDATE_AFTER 合并为一条 UPDATE 消息. 因此, Flink 
将 UPDATE_BEFORE 和 UPDATE_AFTER
+分别编码为 DELETE 和 INSERT 类型的 Ogg 消息。
+
+Dependencies
+------------
+
+#### Ogg Json
+
+{{< sql_download_table "ogg-json" >}}
+
+*注意: 请参考 [Ogg Kafka Handler 
documentation](https://docs.oracle.com/en/middleware/goldengate/big-data/19.1/gadbd/using-kafka-handler.html),
+了解如何设置 Ogg Kafka handler 来将变更日志同步到 Kafka 的 Topic。*
+
+
+How to use Ogg format
+----------------
+
+Ogg 为变更日志提供了统一的格式, 这是一个 JSON 格式的从 Oracle `PRODUCTS` 表捕获的更新操作的简单示例:
+
+```json
+{
+  "before": {
+    "id": 111,
+    "name": "scooter",
+    "description": "Big 2-wheel scooter",
+    "weight": 5.18
+  },
+  "after": {
+    "id": 111,
+    "name": "scooter",
+    "description": "Big 2-wheel scooter",
+    "weight": 5.15
+  },
+  "op_type": "U",
+  "op_ts": "2020-05-13 15:40:06.000000",
+  "current_ts": "2020-05-13 15:40:07.000000",
+  "primary_keys": [
+    "id"
+  ],
+  "pos": "00000000000000000000143",
+  "table": "PRODUCTS"
+}
+```
+
+*注意:请参考 [Debezium 
documentation](https://debezium.io/documentation/reference/1.3/connectors/mysql.html#mysql-connector-events_debezium)
+了解每个字段的含义.*
+
+Oracle `PRODUCTS` 表 有 4 列 (`id`, `name`, `description` and `weight`). 上面的 JSON 
消息是 `PRODUCTS` 表上的一条更新事件,其中 `id = 111` 的行的
+`weight` 值从 `5.18` 更改为 `5.15`. 假设此消息已同步到 Kafka 的 Topic `products_ogg`, 则可以使用以下 
DDL 来使用该 Topic 并解析更新事件。
+
+```sql
+CREATE TABLE topic_products (
+  -- schema is totally the same to the MySQL "products" table

Review comment:
       ```suggestion
     -- schema is totally the same to the Oracle "products" table
   ```

##########
File path: 
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/ogg/OggJsonDecodingFormat.java
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json.ogg;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.common.TimestampFormat;
+import 
org.apache.flink.formats.json.ogg.OggJsonDeserializationSchema.MetadataConverter;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.apache.flink.table.utils.DateTimeUtils;
+import org.apache.flink.types.RowKind;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/** {@link DecodingFormat} for Ogg using JSON encoding. */
+public class OggJsonDecodingFormat implements 
DecodingFormat<DeserializationSchema<RowData>> {
+
+    // 
--------------------------------------------------------------------------------------------
+    // Mutable attributes
+    // 
--------------------------------------------------------------------------------------------
+
+    private List<String> metadataKeys;
+
+    // 
--------------------------------------------------------------------------------------------
+    // Ogg-specific attributes
+    // 
--------------------------------------------------------------------------------------------
+
+    private final boolean ignoreParseErrors;
+    private final TimestampFormat timestampFormat;
+
+    public OggJsonDecodingFormat(boolean ignoreParseErrors, TimestampFormat 
timestampFormat) {
+        this.ignoreParseErrors = ignoreParseErrors;
+        this.timestampFormat = timestampFormat;
+        this.metadataKeys = Collections.emptyList();
+    }
+
+    @Override
+    public DeserializationSchema<RowData> createRuntimeDecoder(
+            DynamicTableSource.Context context, DataType physicalDataType) {
+
+        final List<ReadableMetadata> readableMetadata =
+                metadataKeys.stream()
+                        .map(
+                                k ->
+                                        Stream.of(ReadableMetadata.values())
+                                                .filter(rm -> rm.key.equals(k))
+                                                .findFirst()
+                                                
.orElseThrow(IllegalStateException::new))
+                        .collect(Collectors.toList());
+
+        final List<DataTypes.Field> metadataFields =
+                readableMetadata.stream()
+                        .map(m -> DataTypes.FIELD(m.key, m.dataType))
+                        .collect(Collectors.toList());
+
+        final DataType producedDataType =
+                DataTypeUtils.appendRowFields(physicalDataType, 
metadataFields);
+
+        final TypeInformation<RowData> producedTypeInfo =
+                context.createTypeInformation(producedDataType);
+
+        return new OggJsonDeserializationSchema(
+                physicalDataType,
+                readableMetadata,
+                producedTypeInfo,
+                ignoreParseErrors,
+                timestampFormat);
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Metadata handling
+    // 
--------------------------------------------------------------------------------------------
+
+    @Override
+    public Map<String, DataType> listReadableMetadata() {
+        final Map<String, DataType> metadataMap = new LinkedHashMap<>();
+        Stream.of(ReadableMetadata.values())
+                .forEachOrdered(m -> metadataMap.put(m.key, m.dataType));
+        return metadataMap;
+    }
+
+    @Override
+    public void applyReadableMetadata(List<String> metadataKeys) {
+        this.metadataKeys = metadataKeys;
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode() {
+        return ChangelogMode.newBuilder()
+                .addContainedKind(RowKind.INSERT)
+                .addContainedKind(RowKind.UPDATE_BEFORE)
+                .addContainedKind(RowKind.UPDATE_AFTER)
+                .addContainedKind(RowKind.DELETE)
+                .build();
+    }
+
+    /** List of metadata that can be read with this format. */
+    enum ReadableMetadata {
+        TABLE(
+                "table",
+                DataTypes.STRING().nullable(),
+                DataTypes.FIELD("table", DataTypes.STRING()),
+                new MetadataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(GenericRowData row, int pos) {
+                        return row.getString(pos);
+                    }
+                }),
+
+        PRIMARY_KEYS(
+                "primary_keys",

Review comment:
       ```suggestion
                   "primary-keys",
   ```
   We use '-' in Flink instead of '_', like `ingestion-timestamp` and 
`event-timestamp`

##########
File path: docs/content/docs/connectors/table/formats/ogg.md
##########
@@ -0,0 +1,270 @@
+---
+title: Ogg
+weight: 8
+type: docs
+aliases:
+  - /dev/table/connectors/formats/ogg.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Ogg Format
+
+{{< label "Changelog-Data-Capture Format" >}} {{< label "Format: Serialization 
Schema" >}} {{<
+label "Format: Deserialization Schema" >}}
+
+[Oracle GoldenGate](https://www.oracle.com/integration/goldengate//) (a.k.a 
ogg) is a managed service
+providing a real-time data mesh platform, which uses replication to keep data 
highly available, and
+enabling real-time analysis. Customers can design, execute, and monitor their 
data replication and
+stream data processing solutions without the need to allocate or manage 
compute environments. Ogg
+provides a format schema for changelog and supports to serialize messages 
using JSON.
+
+Flink supports to interpret Ogg JSON as INSERT/UPDATE/DELETE messages into 
Flink SQL system. This is
+useful in many cases to leverage this feature, such as
+
+- synchronizing incremental data from databases to other systems
+- auditing logs
+- real-time materialized views on databases
+- temporal join changing history of a database table and so on.
+
+Flink also supports to encode the INSERT/UPDATE/DELETE messages in Flink SQL 
as Ogg JSON, and emit
+to external systems like Kafka. However, currently Flink can't combine 
UPDATE_BEFORE and
+UPDATE_AFTER into a single UPDATE message. Therefore, Flink encodes 
UPDATE_BEFORE and UPDATE_AFTER
+as DELETE and INSERT Ogg messages.
+
+Dependencies
+------------
+
+#### Ogg Json
+
+{{< sql_download_table "ogg-json" >}}
+
+*Note: please refer
+to [Ogg Kafka Handler 
documentation](https://docs.oracle.com/en/middleware/goldengate/big-data/19.1/gadbd/using-kafka-handler.html)
+about how to set up an Ogg Kafka handler to synchronize changelog to Kafka 
topics.*
+
+
+How to use Ogg format
+----------------
+
+Ogg provides a unified format for changelog, here is a simple example for an 
update operation
+captured from an Oracle `PRODUCTS` table in JSON format:
+
+```json
+{
+  "before": {
+    "id": 111,
+    "name": "scooter",
+    "description": "Big 2-wheel scooter",
+    "weight": 5.18
+  },
+  "after": {
+    "id": 111,
+    "name": "scooter",
+    "description": "Big 2-wheel scooter",
+    "weight": 5.15
+  },
+  "op_type": "U",
+  "op_ts": "2020-05-13 15:40:06.000000",
+  "current_ts": "2020-05-13 15:40:07.000000",
+  "primary_keys": [
+    "id"
+  ],
+  "pos": "00000000000000000000143",
+  "table": "PRODUCTS"
+}
+```
+
+*Note: please refer
+to [Debezium 
documentation](https://debezium.io/documentation/reference/1.3/connectors/mysql.html#mysql-connector-events_debezium)
+about the meaning of each field.*
+
+The Oracle `PRODUCTS` table has 4 columns (`id`, `name`, `description` and 
`weight`). The above JSON
+message is an update change event on the `PRODUCTS` table where the `weight` 
value of the row
+with `id = 111` is changed from `5.18` to `5.15`. Assuming this messages is 
synchronized to Kafka
+topic `products_ogg`, then we can use the following DDL to consume this topic 
and interpret the
+change events.
+
+```sql
+CREATE TABLE topic_products (
+  -- schema is totally the same to the MySQL "products" table

Review comment:
       ```suggestion
     -- schema is totally the same to the Oracle "products" table
   ```

##########
File path: docs/content/docs/connectors/table/formats/ogg.md
##########
@@ -0,0 +1,270 @@
+---
+title: Ogg
+weight: 8
+type: docs
+aliases:
+  - /dev/table/connectors/formats/ogg.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Ogg Format
+
+{{< label "Changelog-Data-Capture Format" >}} {{< label "Format: Serialization 
Schema" >}} {{<
+label "Format: Deserialization Schema" >}}
+
+[Oracle GoldenGate](https://www.oracle.com/integration/goldengate//) (a.k.a 
ogg) is a managed service
+providing a real-time data mesh platform, which uses replication to keep data 
highly available, and
+enabling real-time analysis. Customers can design, execute, and monitor their 
data replication and
+stream data processing solutions without the need to allocate or manage 
compute environments. Ogg
+provides a format schema for changelog and supports to serialize messages 
using JSON.
+
+Flink supports to interpret Ogg JSON as INSERT/UPDATE/DELETE messages into 
Flink SQL system. This is
+useful in many cases to leverage this feature, such as
+
+- synchronizing incremental data from databases to other systems
+- auditing logs
+- real-time materialized views on databases
+- temporal join changing history of a database table and so on.
+
+Flink also supports to encode the INSERT/UPDATE/DELETE messages in Flink SQL 
as Ogg JSON, and emit
+to external systems like Kafka. However, currently Flink can't combine 
UPDATE_BEFORE and
+UPDATE_AFTER into a single UPDATE message. Therefore, Flink encodes 
UPDATE_BEFORE and UPDATE_AFTER
+as DELETE and INSERT Ogg messages.
+
+Dependencies
+------------
+
+#### Ogg Json
+
+{{< sql_download_table "ogg-json" >}}
+
+*Note: please refer
+to [Ogg Kafka Handler 
documentation](https://docs.oracle.com/en/middleware/goldengate/big-data/19.1/gadbd/using-kafka-handler.html)
+about how to set up an Ogg Kafka handler to synchronize changelog to Kafka 
topics.*
+
+
+How to use Ogg format
+----------------
+
+Ogg provides a unified format for changelog, here is a simple example for an 
update operation
+captured from an Oracle `PRODUCTS` table in JSON format:
+
+```json
+{
+  "before": {
+    "id": 111,
+    "name": "scooter",
+    "description": "Big 2-wheel scooter",
+    "weight": 5.18
+  },
+  "after": {
+    "id": 111,
+    "name": "scooter",
+    "description": "Big 2-wheel scooter",
+    "weight": 5.15
+  },
+  "op_type": "U",
+  "op_ts": "2020-05-13 15:40:06.000000",
+  "current_ts": "2020-05-13 15:40:07.000000",
+  "primary_keys": [
+    "id"
+  ],
+  "pos": "00000000000000000000143",
+  "table": "PRODUCTS"
+}
+```
+
+*Note: please refer
+to [Debezium 
documentation](https://debezium.io/documentation/reference/1.3/connectors/mysql.html#mysql-connector-events_debezium)
+about the meaning of each field.*
+
+The Oracle `PRODUCTS` table has 4 columns (`id`, `name`, `description` and 
`weight`). The above JSON
+message is an update change event on the `PRODUCTS` table where the `weight` 
value of the row
+with `id = 111` is changed from `5.18` to `5.15`. Assuming this messages is 
synchronized to Kafka
+topic `products_ogg`, then we can use the following DDL to consume this topic 
and interpret the
+change events.
+
+```sql
+CREATE TABLE topic_products (
+  -- schema is totally the same to the MySQL "products" table
+  id BIGINT,
+  name STRING,
+  description STRING,
+  weight DECIMAL(10, 2)
+) WITH (
+  'connector' = 'kafka',
+  'topic' = 'products_ogg',
+  'properties.bootstrap.servers' = 'localhost:9092',
+  'properties.group.id' = 'testGroup',
+  'format' = 'ogg-json'
+)
+```
+
+After registering the topic as a Flink table, then you can consume the Ogg 
messages as a changelog
+source.
+
+```sql
+-- a real-time materialized view on the Oracle "PRODUCTS"
+-- which calculate the latest average of weight for the same products
+SELECT name, AVG(weight)
+FROM topic_products
+GROUP BY name;
+
+-- synchronize all the data and incremental changes of Oracle "PRODUCTS" table 
to
+-- Elasticsearch "products" index for future searching
+INSERT INTO elasticsearch_products
+SELECT *
+FROM topic_products;
+```
+
+Available Metadata
+------------------
+
+The following format metadata can be exposed as read-only (`VIRTUAL`) columns 
in a table definition.
+
+<span class="label label-danger">Attention</span> Format metadata fields are 
only available if the
+corresponding connector forwards format metadata. Currently, only the Kafka 
connector is able to
+expose metadata fields for its value format.
+
+<table class="table table-bordered">
+    <thead>
+    <tr>
+      <th class="text-left" style="width: 25%">Key</th>
+      <th class="text-center" style="width: 40%">Data Type</th>
+      <th class="text-center" style="width: 40%">Description</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><code>table</code></td>
+      <td><code>STRING NULL </code></td>
+      <td>Contains fully qualified table name. The format of the fully 
qualified table name is: 
+        CATALOG NAME.SCHEMA NAME.TABLE NAME</td>
+    </tr>
+    <tr>
+      <td><code>primary_keys</code></td>
+      <td><code>ARRAY&lt;STRING&gt; NULL</code></td>
+      <td>An array variable holding the column names of the primary keys of 
the source table. 
+        The primary_keys field is only include in the JSON output if the 
includePrimaryKeys 
+        configuration property is set to true.</td>
+    </tr>

Review comment:
       ```suggestion
         <td><code>primary-keys</code></td>
         <td><code>ARRAY&lt;STRING&gt; NULL</code></td>
         <td>An array variable holding the column names of the primary keys of 
the source table. 
           The primary-keys field is only include in the JSON output if the 
includePrimaryKeys 
           configuration property is set to true.</td>
       </tr>
   ```

##########
File path: 
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/ogg/OggJsonDeserializationSchema.java
##########
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json.ogg;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
+import 
org.apache.flink.formats.json.ogg.OggJsonDecodingFormat.ReadableMetadata;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Collector;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static java.lang.String.format;
+
+/**
+ * Deserialization schema from Ogg JSON to Flink Table/SQL internal data 
structure {@link RowData}.
+ * The deserialization schema knows Ogg's schema definition and can extract 
the database data and
+ * convert into {@link RowData} with {@link RowKind}.
+ *
+ * <p>Deserializes a <code>byte[]</code> message as a JSON object and reads 
the specified fields.
+ *
+ * <p>Failures during deserialization are forwarded as wrapped IOExceptions.
+ *
+ * @see <a 
href="https://www.oracle.com/cn/middleware/technologies/goldengate/overview.html";>Ogg</a>
+ */
+@Internal
+public final class OggJsonDeserializationSchema implements 
DeserializationSchema<RowData> {
+    private static final long serialVersionUID = 1L;
+
+    private static final String OP_CREATE = "I"; // insert
+    private static final String OP_UPDATE = "U"; // update
+    private static final String OP_DELETE = "D"; // delete
+    private static final String OP_TRUNCATE = "T"; // truncate
+
+    private static final String REPLICA_IDENTITY_EXCEPTION =
+            "The \"before\" field of %s message is null, "
+                    + "if you are using Ogg Postgres Connector, "
+                    + "please check the Postgres table has been set REPLICA 
IDENTITY to FULL level.";
+
+    /** The deserializer to deserialize Ogg JSON data. */
+    private final JsonRowDataDeserializationSchema jsonDeserializer;
+
+    /** Flag that indicates that an additional projection is required for 
metadata. */
+    private final boolean hasMetadata;
+
+    /** Metadata to be extracted for every record. */
+    private final MetadataConverter[] metadataConverters;
+
+    /** {@link TypeInformation} of the produced {@link RowData} (physical + 
metadata). */
+    private final TypeInformation<RowData> producedTypeInfo;
+
+    /** Flag indicating whether to ignore invalid fields/rows (default: throw 
an exception). */
+    private final boolean ignoreParseErrors;
+
+    public OggJsonDeserializationSchema(
+            DataType physicalDataType,
+            List<ReadableMetadata> requestedMetadata,
+            TypeInformation<RowData> producedTypeInfo,
+            boolean ignoreParseErrors,
+            TimestampFormat timestampFormat) {
+        final RowType jsonRowType = createJsonRowType(physicalDataType, 
requestedMetadata);
+        this.jsonDeserializer =
+                new JsonRowDataDeserializationSchema(
+                        jsonRowType,
+                        // the result type is never used, so it's fine to pass 
in the produced type
+                        // info
+                        producedTypeInfo,
+                        false, // ignoreParseErrors already contains the 
functionality of
+                        // failOnMissingField
+                        ignoreParseErrors,
+                        timestampFormat);
+        this.hasMetadata = requestedMetadata.size() > 0;
+        this.metadataConverters = createMetadataConverters(jsonRowType, 
requestedMetadata);
+        this.producedTypeInfo = producedTypeInfo;
+        this.ignoreParseErrors = ignoreParseErrors;
+    }
+
+    private static RowType createJsonRowType(
+            DataType physicalDataType, List<ReadableMetadata> 
readableMetadata) {
+        DataType root =
+                DataTypes.ROW(
+                        DataTypes.FIELD("before", physicalDataType),
+                        DataTypes.FIELD("after", physicalDataType),
+                        DataTypes.FIELD("op_type", DataTypes.STRING()));
+        // append fields that are required for reading metadata in the root
+        final List<DataTypes.Field> rootMetadataFields =
+                readableMetadata.stream()
+                        .map(m -> m.requiredJsonField)
+                        .distinct()
+                        .collect(Collectors.toList());
+        return (RowType) DataTypeUtils.appendRowFields(root, 
rootMetadataFields).getLogicalType();
+    }
+
+    private static MetadataConverter[] createMetadataConverters(
+            RowType jsonRowType, List<ReadableMetadata> requestedMetadata) {
+        return requestedMetadata.stream()
+                .map(m -> convertInRoot(jsonRowType, m))
+                .toArray(MetadataConverter[]::new);
+    }
+
+    private static MetadataConverter convertInRoot(RowType jsonRowType, 
ReadableMetadata metadata) {
+        final int pos = findFieldPos(metadata, jsonRowType);
+        return new MetadataConverter() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(GenericRowData root, int unused) {
+                return metadata.converter.convert(root, pos);
+            }
+        };
+    }
+
+    private static int findFieldPos(ReadableMetadata metadata, RowType 
jsonRowType) {
+        return 
jsonRowType.getFieldNames().indexOf(metadata.requiredJsonField.getName());
+    }
+
+    @Override
+    public RowData deserialize(byte[] message) {
+        throw new RuntimeException(
+                "Please invoke DeserializationSchema#deserialize(byte[], 
Collector<RowData>) instead.");
+    }
+
+    @Override
+    public void deserialize(byte[] message, Collector<RowData> out) throws 
IOException {
+        if (message == null || message.length == 0) {
+            // skip tombstone messages
+            return;
+        }
+        try {
+            GenericRowData row = (GenericRowData) 
jsonDeserializer.deserialize(message);
+
+            GenericRowData before = (GenericRowData) row.getField(0);
+            GenericRowData after = (GenericRowData) row.getField(1);
+            String op = row.getField(2).toString();
+            if (OP_CREATE.equals(op)) {
+                after.setRowKind(RowKind.INSERT);
+                emitRow(row, after, out);
+            } else if (OP_UPDATE.equals(op)) {
+                if (before == null) {
+                    throw new IllegalStateException(
+                            String.format(REPLICA_IDENTITY_EXCEPTION, 
"UPDATE"));
+                }
+                before.setRowKind(RowKind.UPDATE_BEFORE);
+                after.setRowKind(RowKind.UPDATE_AFTER);
+                emitRow(row, before, out);
+                emitRow(row, after, out);
+            } else if (OP_DELETE.equals(op)) {
+                if (before == null) {
+                    throw new IllegalStateException(
+                            String.format(REPLICA_IDENTITY_EXCEPTION, 
"DELETE"));
+                }
+                before.setRowKind(RowKind.DELETE);
+                emitRow(row, before, out);
+            } else if (OP_TRUNCATE.equals(op)) {
+                // flink can't interpret such type of record, skip the record

Review comment:
       print a WARN LOG ?

##########
File path: docs/content.zh/docs/connectors/table/formats/ogg.md
##########
@@ -0,0 +1,256 @@
+---
+title: Ogg
+weight: 8
+type: docs
+aliases:
+- /dev/table/connectors/formats/ogg.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Ogg Format
+
+{{< label "Changelog-Data-Capture Format" >}} {{< label "Format: Serialization 
Schema" >}} {{<
+label "Format: Deserialization Schema" >}}
+
+[Oracle GoldenGate](https://www.oracle.com/integration/goldengate//) (a.k.a 
ogg) 是一个实现异构 IT 环境间数据实时数据集成和复制的综合软件包。

Review comment:
       ```suggestion
   [Oracle GoldenGate](https://www.oracle.com/integration/goldengate/) (a.k.a 
ogg) 是一个实现异构 IT 环境间数据实时数据集成和复制的综合软件包。
   ```

##########
File path: docs/content/docs/connectors/table/formats/ogg.md
##########
@@ -0,0 +1,270 @@
+---
+title: Ogg
+weight: 8
+type: docs
+aliases:
+  - /dev/table/connectors/formats/ogg.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Ogg Format
+
+{{< label "Changelog-Data-Capture Format" >}} {{< label "Format: Serialization 
Schema" >}} {{<
+label "Format: Deserialization Schema" >}}
+
+[Oracle GoldenGate](https://www.oracle.com/integration/goldengate//) (a.k.a 
ogg) is a managed service
+providing a real-time data mesh platform, which uses replication to keep data 
highly available, and
+enabling real-time analysis. Customers can design, execute, and monitor their 
data replication and
+stream data processing solutions without the need to allocate or manage 
compute environments. Ogg
+provides a format schema for changelog and supports to serialize messages 
using JSON.
+
+Flink supports to interpret Ogg JSON as INSERT/UPDATE/DELETE messages into 
Flink SQL system. This is
+useful in many cases to leverage this feature, such as
+
+- synchronizing incremental data from databases to other systems
+- auditing logs
+- real-time materialized views on databases
+- temporal join changing history of a database table and so on.
+
+Flink also supports to encode the INSERT/UPDATE/DELETE messages in Flink SQL 
as Ogg JSON, and emit
+to external systems like Kafka. However, currently Flink can't combine 
UPDATE_BEFORE and
+UPDATE_AFTER into a single UPDATE message. Therefore, Flink encodes 
UPDATE_BEFORE and UPDATE_AFTER
+as DELETE and INSERT Ogg messages.
+
+Dependencies
+------------
+
+#### Ogg Json
+
+{{< sql_download_table "ogg-json" >}}
+
+*Note: please refer
+to [Ogg Kafka Handler 
documentation](https://docs.oracle.com/en/middleware/goldengate/big-data/19.1/gadbd/using-kafka-handler.html)
+about how to set up an Ogg Kafka handler to synchronize changelog to Kafka 
topics.*
+
+
+How to use Ogg format
+----------------
+
+Ogg provides a unified format for changelog, here is a simple example for an 
update operation
+captured from an Oracle `PRODUCTS` table in JSON format:
+
+```json
+{
+  "before": {
+    "id": 111,
+    "name": "scooter",
+    "description": "Big 2-wheel scooter",
+    "weight": 5.18
+  },
+  "after": {
+    "id": 111,
+    "name": "scooter",
+    "description": "Big 2-wheel scooter",
+    "weight": 5.15
+  },
+  "op_type": "U",
+  "op_ts": "2020-05-13 15:40:06.000000",
+  "current_ts": "2020-05-13 15:40:07.000000",
+  "primary_keys": [
+    "id"
+  ],
+  "pos": "00000000000000000000143",
+  "table": "PRODUCTS"
+}
+```
+
+*Note: please refer
+to [Debezium 
documentation](https://debezium.io/documentation/reference/1.3/connectors/mysql.html#mysql-connector-events_debezium)
+about the meaning of each field.*
+
+The Oracle `PRODUCTS` table has 4 columns (`id`, `name`, `description` and 
`weight`). The above JSON
+message is an update change event on the `PRODUCTS` table where the `weight` 
value of the row
+with `id = 111` is changed from `5.18` to `5.15`. Assuming this messages is 
synchronized to Kafka
+topic `products_ogg`, then we can use the following DDL to consume this topic 
and interpret the
+change events.
+
+```sql
+CREATE TABLE topic_products (
+  -- schema is totally the same to the MySQL "products" table
+  id BIGINT,
+  name STRING,
+  description STRING,
+  weight DECIMAL(10, 2)
+) WITH (
+  'connector' = 'kafka',
+  'topic' = 'products_ogg',
+  'properties.bootstrap.servers' = 'localhost:9092',
+  'properties.group.id' = 'testGroup',
+  'format' = 'ogg-json'
+)
+```
+
+After registering the topic as a Flink table, then you can consume the Ogg 
messages as a changelog
+source.
+
+```sql
+-- a real-time materialized view on the Oracle "PRODUCTS"
+-- which calculate the latest average of weight for the same products
+SELECT name, AVG(weight)
+FROM topic_products
+GROUP BY name;
+
+-- synchronize all the data and incremental changes of Oracle "PRODUCTS" table 
to
+-- Elasticsearch "products" index for future searching
+INSERT INTO elasticsearch_products
+SELECT *
+FROM topic_products;
+```
+
+Available Metadata
+------------------
+
+The following format metadata can be exposed as read-only (`VIRTUAL`) columns 
in a table definition.
+
+<span class="label label-danger">Attention</span> Format metadata fields are 
only available if the
+corresponding connector forwards format metadata. Currently, only the Kafka 
connector is able to
+expose metadata fields for its value format.
+
+<table class="table table-bordered">
+    <thead>
+    <tr>
+      <th class="text-left" style="width: 25%">Key</th>
+      <th class="text-center" style="width: 40%">Data Type</th>
+      <th class="text-center" style="width: 40%">Description</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><code>table</code></td>
+      <td><code>STRING NULL </code></td>
+      <td>Contains fully qualified table name. The format of the fully 
qualified table name is: 
+        CATALOG NAME.SCHEMA NAME.TABLE NAME</td>
+    </tr>
+    <tr>
+      <td><code>primary_keys</code></td>
+      <td><code>ARRAY&lt;STRING&gt; NULL</code></td>
+      <td>An array variable holding the column names of the primary keys of 
the source table. 
+        The primary_keys field is only include in the JSON output if the 
includePrimaryKeys 
+        configuration property is set to true.</td>
+    </tr>
+    <tr>
+      <td><code>ingestion-timestamp</code></td>
+      <td><code>TIMESTAMP_LTZ(6) NULL</code></td>
+      <td>The timestamp at which the connector processed the event. 
Corresponds to the current_ts field in the Ogg record.</td>
+    </tr>
+    <tr>
+      <td><code>event-timestamp</code></td>
+      <td><code>TIMESTAMP_LTZ(6) NULL</code></td>
+      <td>The timestamp at which the source system created the event. 
Corresponds to the op_ts field in the Ogg record.</td>
+    </tr>
+    </tbody>
+</table>
+
+The following example shows how to access Ogg metadata fields in Kafka:
+
+```sql
+CREATE TABLE KafkaTable (
+  origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
+  event_time TIMESTAMP(3) METADATA FROM 'value.event-timestamp' VIRTUAL,
+  origin_table STRING METADATA FROM 'value.table' VIRTUAL,
+  primary_keys ARRAY<STRING> METADATA FROM 'value.primary_keys' VIRTUAL,

Review comment:
       ```suggestion
     primary_keys ARRAY<STRING> METADATA FROM 'value.primary-keys' VIRTUAL,
   ```

##########
File path: docs/content.zh/docs/connectors/table/formats/ogg.md
##########
@@ -0,0 +1,256 @@
+---
+title: Ogg
+weight: 8
+type: docs
+aliases:
+- /dev/table/connectors/formats/ogg.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Ogg Format
+
+{{< label "Changelog-Data-Capture Format" >}} {{< label "Format: Serialization 
Schema" >}} {{<
+label "Format: Deserialization Schema" >}}
+
+[Oracle GoldenGate](https://www.oracle.com/integration/goldengate//) (a.k.a 
ogg) 是一个实现异构 IT 环境间数据实时数据集成和复制的综合软件包。
+该产品集支持高可用性解决方案、实时数据集成、事务更改数据捕获、运营和分析企业系统之间的数据复制、转换和验证。Ogg 
为变更日志提供了统一的格式结构,并支持使用 JSON 序列化消息。
+
+Flink 支持将 Ogg JSON 消息解析为 INSERT/UPDATE/DELETE 消息到 Flink SQL 
系统中。在很多情况下,利用这个特性非常有用,例如
+
+- 将增量数据从数据库同步到其他系统
+- 日志审计
+- 数据库的实时物化视图
+- 关联维度数据库的变更历史,等等
+
+Flink 还支持将 Flink SQL 中的 INSERT/UPDATE/DELETE 消息编码为 Ogg JSON 格式的消息, 输出到 Kafka 
等存储中。
+但需要注意, 目前 Flink 还不支持将 UPDATE_BEFORE 和 UPDATE_AFTER 合并为一条 UPDATE 消息. 因此, Flink 
将 UPDATE_BEFORE 和 UPDATE_AFTER
+分别编码为 DELETE 和 INSERT 类型的 Ogg 消息。
+
+Dependencies
+------------
+
+#### Ogg Json
+
+{{< sql_download_table "ogg-json" >}}
+
+*注意: 请参考 [Ogg Kafka Handler 
documentation](https://docs.oracle.com/en/middleware/goldengate/big-data/19.1/gadbd/using-kafka-handler.html),
+了解如何设置 Ogg Kafka handler 来将变更日志同步到 Kafka 的 Topic。*
+
+
+How to use Ogg format
+----------------
+
+Ogg 为变更日志提供了统一的格式, 这是一个 JSON 格式的从 Oracle `PRODUCTS` 表捕获的更新操作的简单示例:
+
+```json
+{
+  "before": {
+    "id": 111,
+    "name": "scooter",
+    "description": "Big 2-wheel scooter",
+    "weight": 5.18
+  },
+  "after": {
+    "id": 111,
+    "name": "scooter",
+    "description": "Big 2-wheel scooter",
+    "weight": 5.15
+  },
+  "op_type": "U",
+  "op_ts": "2020-05-13 15:40:06.000000",
+  "current_ts": "2020-05-13 15:40:07.000000",
+  "primary_keys": [
+    "id"
+  ],
+  "pos": "00000000000000000000143",
+  "table": "PRODUCTS"
+}
+```
+
+*注意:请参考 [Debezium 
documentation](https://debezium.io/documentation/reference/1.3/connectors/mysql.html#mysql-connector-events_debezium)
+了解每个字段的含义.*
+
+Oracle `PRODUCTS` 表 有 4 列 (`id`, `name`, `description` and `weight`). 上面的 JSON 
消息是 `PRODUCTS` 表上的一条更新事件,其中 `id = 111` 的行的
+`weight` 值从 `5.18` 更改为 `5.15`. 假设此消息已同步到 Kafka 的 Topic `products_ogg`, 则可以使用以下 
DDL 来使用该 Topic 并解析更新事件。
+
+```sql
+CREATE TABLE topic_products (
+  -- schema is totally the same to the MySQL "products" table
+  id BIGINT,
+  name STRING,
+  description STRING,
+  weight DECIMAL(10, 2)
+) WITH (
+  'connector' = 'kafka',
+  'topic' = 'products_ogg',
+  'properties.bootstrap.servers' = 'localhost:9092',
+  'properties.group.id' = 'testGroup',
+  'format' = 'ogg-json'
+)
+```
+
+再将 Kafka Topic 注册为 Flink 表之后, 可以将 OGG 消息变为变更日志源。
+
+```sql
+-- a real-time materialized view on the Oracle "PRODUCTS"
+-- which calculate the latest average of weight for the same products
+SELECT name, AVG(weight)
+FROM topic_products
+GROUP BY name;
+
+-- synchronize all the data and incremental changes of Oracle "PRODUCTS" table 
to
+-- Elasticsearch "products" index for future searching
+INSERT INTO elasticsearch_products
+SELECT *
+FROM topic_products;
+```
+
+Available Metadata
+------------------
+
+The following format metadata can be exposed as read-only (`VIRTUAL`) columns 
in a table definition.
+
+<span class="label label-danger">Attention</span> Format metadata fields are 
only available if the
+corresponding connector forwards format metadata. Currently, only the Kafka 
connector is able to
+expose metadata fields for its value format.
+
+<table class="table table-bordered">
+    <thead>
+    <tr>
+      <th class="text-left" style="width: 25%">Key</th>
+      <th class="text-center" style="width: 40%">Data Type</th>
+      <th class="text-center" style="width: 40%">Description</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><code>table</code></td>
+      <td><code>STRING NULL </code></td>
+      <td>Contains fully qualified table name. The format of the fully 
qualified table name is: 
+        CATALOG NAME.SCHEMA NAME.TABLE NAME</td>
+    </tr>
+    <tr>
+      <td><code>primary_keys</code></td>
+      <td><code>ARRAY&lt;STRING&gt; NULL</code></td>
+      <td>An array variable holding the column names of the primary keys of 
the source table. 
+        The primary_keys field is only include in the JSON output if the 
includePrimaryKeys 
+        configuration property is set to true.</td>
+    </tr>
+    <tr>
+      <td><code>ingestion-timestamp</code></td>
+      <td><code>TIMESTAMP_LTZ(6) NULL</code></td>
+      <td>The timestamp at which the connector processed the event. 
Corresponds to the current_ts field in the Ogg record.</td>
+    </tr>
+    <tr>
+      <td><code>event-timestamp</code></td>
+      <td><code>TIMESTAMP_LTZ(6) NULL</code></td>
+      <td>The timestamp at which the source system created the event. 
Corresponds to the op_ts field in the Ogg record.</td>
+    </tr>
+    </tbody>
+</table>
+
+The following example shows how to access Ogg metadata fields in Kafka:
+
+```sql
+CREATE TABLE KafkaTable (
+  origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
+  event_time TIMESTAMP(3) METADATA FROM 'value.event-timestamp' VIRTUAL,
+  origin_table STRING METADATA FROM 'value.table' VIRTUAL,
+  primary_keys ARRAY<STRING> METADATA FROM 'value.primary_keys' VIRTUAL,

Review comment:
       primary-keys

##########
File path: docs/content.zh/docs/connectors/table/formats/ogg.md
##########
@@ -0,0 +1,256 @@
+---
+title: Ogg
+weight: 8
+type: docs
+aliases:
+- /dev/table/connectors/formats/ogg.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Ogg Format
+
+{{< label "Changelog-Data-Capture Format" >}} {{< label "Format: Serialization 
Schema" >}} {{<
+label "Format: Deserialization Schema" >}}
+
+[Oracle GoldenGate](https://www.oracle.com/integration/goldengate//) (a.k.a 
ogg) 是一个实现异构 IT 环境间数据实时数据集成和复制的综合软件包。
+该产品集支持高可用性解决方案、实时数据集成、事务更改数据捕获、运营和分析企业系统之间的数据复制、转换和验证。Ogg 
为变更日志提供了统一的格式结构,并支持使用 JSON 序列化消息。
+
+Flink 支持将 Ogg JSON 消息解析为 INSERT/UPDATE/DELETE 消息到 Flink SQL 
系统中。在很多情况下,利用这个特性非常有用,例如
+
+- 将增量数据从数据库同步到其他系统
+- 日志审计
+- 数据库的实时物化视图
+- 关联维度数据库的变更历史,等等
+
+Flink 还支持将 Flink SQL 中的 INSERT/UPDATE/DELETE 消息编码为 Ogg JSON 格式的消息, 输出到 Kafka 
等存储中。
+但需要注意, 目前 Flink 还不支持将 UPDATE_BEFORE 和 UPDATE_AFTER 合并为一条 UPDATE 消息. 因此, Flink 
将 UPDATE_BEFORE 和 UPDATE_AFTER
+分别编码为 DELETE 和 INSERT 类型的 Ogg 消息。
+
+Dependencies
+------------
+
+#### Ogg Json
+
+{{< sql_download_table "ogg-json" >}}
+
+*注意: 请参考 [Ogg Kafka Handler 
documentation](https://docs.oracle.com/en/middleware/goldengate/big-data/19.1/gadbd/using-kafka-handler.html),
+了解如何设置 Ogg Kafka handler 来将变更日志同步到 Kafka 的 Topic。*
+
+
+How to use Ogg format
+----------------
+
+Ogg 为变更日志提供了统一的格式, 这是一个 JSON 格式的从 Oracle `PRODUCTS` 表捕获的更新操作的简单示例:
+
+```json
+{
+  "before": {
+    "id": 111,
+    "name": "scooter",
+    "description": "Big 2-wheel scooter",
+    "weight": 5.18
+  },
+  "after": {
+    "id": 111,
+    "name": "scooter",
+    "description": "Big 2-wheel scooter",
+    "weight": 5.15
+  },
+  "op_type": "U",
+  "op_ts": "2020-05-13 15:40:06.000000",
+  "current_ts": "2020-05-13 15:40:07.000000",
+  "primary_keys": [
+    "id"
+  ],
+  "pos": "00000000000000000000143",
+  "table": "PRODUCTS"
+}
+```
+
+*注意:请参考 [Debezium 
documentation](https://debezium.io/documentation/reference/1.3/connectors/mysql.html#mysql-connector-events_debezium)
+了解每个字段的含义.*
+
+Oracle `PRODUCTS` 表 有 4 列 (`id`, `name`, `description` and `weight`). 上面的 JSON 
消息是 `PRODUCTS` 表上的一条更新事件,其中 `id = 111` 的行的
+`weight` 值从 `5.18` 更改为 `5.15`. 假设此消息已同步到 Kafka 的 Topic `products_ogg`, 则可以使用以下 
DDL 来使用该 Topic 并解析更新事件。
+
+```sql
+CREATE TABLE topic_products (
+  -- schema is totally the same to the MySQL "products" table
+  id BIGINT,
+  name STRING,
+  description STRING,
+  weight DECIMAL(10, 2)
+) WITH (
+  'connector' = 'kafka',
+  'topic' = 'products_ogg',
+  'properties.bootstrap.servers' = 'localhost:9092',
+  'properties.group.id' = 'testGroup',
+  'format' = 'ogg-json'
+)
+```
+
+再将 Kafka Topic 注册为 Flink 表之后, 可以将 OGG 消息变为变更日志源。
+
+```sql
+-- a real-time materialized view on the Oracle "PRODUCTS"
+-- which calculate the latest average of weight for the same products
+SELECT name, AVG(weight)
+FROM topic_products
+GROUP BY name;
+
+-- synchronize all the data and incremental changes of Oracle "PRODUCTS" table 
to
+-- Elasticsearch "products" index for future searching
+INSERT INTO elasticsearch_products
+SELECT *
+FROM topic_products;
+```
+
+Available Metadata
+------------------
+
+The following format metadata can be exposed as read-only (`VIRTUAL`) columns 
in a table definition.
+
+<span class="label label-danger">Attention</span> Format metadata fields are 
only available if the
+corresponding connector forwards format metadata. Currently, only the Kafka 
connector is able to
+expose metadata fields for its value format.
+
+<table class="table table-bordered">
+    <thead>
+    <tr>
+      <th class="text-left" style="width: 25%">Key</th>
+      <th class="text-center" style="width: 40%">Data Type</th>
+      <th class="text-center" style="width: 40%">Description</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><code>table</code></td>
+      <td><code>STRING NULL </code></td>
+      <td>Contains fully qualified table name. The format of the fully 
qualified table name is: 
+        CATALOG NAME.SCHEMA NAME.TABLE NAME</td>
+    </tr>
+    <tr>
+      <td><code>primary_keys</code></td>
+      <td><code>ARRAY&lt;STRING&gt; NULL</code></td>
+      <td>An array variable holding the column names of the primary keys of 
the source table. 
+        The primary_keys field is only include in the JSON output if the 
includePrimaryKeys 
+        configuration property is set to true.</td>

Review comment:
       ```suggestion
         <td><code>primary-keys</code></td>
         <td><code>ARRAY&lt;STRING&gt; NULL</code></td>
         <td>An array variable holding the column names of the primary keys of 
the source table. 
           The primary-keys field is only include in the JSON output if the 
includePrimaryKeys 
           configuration property is set to true.</td>
   ```

##########
File path: docs/content.zh/docs/connectors/table/formats/ogg.md
##########
@@ -0,0 +1,256 @@
+---
+title: Ogg
+weight: 8
+type: docs
+aliases:
+- /dev/table/connectors/formats/ogg.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Ogg Format
+
+{{< label "Changelog-Data-Capture Format" >}} {{< label "Format: Serialization 
Schema" >}} {{<
+label "Format: Deserialization Schema" >}}
+
+[Oracle GoldenGate](https://www.oracle.com/integration/goldengate//) (a.k.a 
ogg) 是一个实现异构 IT 环境间数据实时数据集成和复制的综合软件包。
+该产品集支持高可用性解决方案、实时数据集成、事务更改数据捕获、运营和分析企业系统之间的数据复制、转换和验证。Ogg 
为变更日志提供了统一的格式结构,并支持使用 JSON 序列化消息。
+
+Flink 支持将 Ogg JSON 消息解析为 INSERT/UPDATE/DELETE 消息到 Flink SQL 
系统中。在很多情况下,利用这个特性非常有用,例如
+
+- 将增量数据从数据库同步到其他系统
+- 日志审计
+- 数据库的实时物化视图
+- 关联维度数据库的变更历史,等等
+
+Flink 还支持将 Flink SQL 中的 INSERT/UPDATE/DELETE 消息编码为 Ogg JSON 格式的消息, 输出到 Kafka 
等存储中。
+但需要注意, 目前 Flink 还不支持将 UPDATE_BEFORE 和 UPDATE_AFTER 合并为一条 UPDATE 消息. 因此, Flink 
将 UPDATE_BEFORE 和 UPDATE_AFTER
+分别编码为 DELETE 和 INSERT 类型的 Ogg 消息。
+
+Dependencies
+------------
+
+#### Ogg Json
+
+{{< sql_download_table "ogg-json" >}}
+
+*注意: 请参考 [Ogg Kafka Handler 
documentation](https://docs.oracle.com/en/middleware/goldengate/big-data/19.1/gadbd/using-kafka-handler.html),
+了解如何设置 Ogg Kafka handler 来将变更日志同步到 Kafka 的 Topic。*
+
+
+How to use Ogg format
+----------------
+
+Ogg 为变更日志提供了统一的格式, 这是一个 JSON 格式的从 Oracle `PRODUCTS` 表捕获的更新操作的简单示例:
+
+```json
+{
+  "before": {
+    "id": 111,
+    "name": "scooter",
+    "description": "Big 2-wheel scooter",
+    "weight": 5.18
+  },
+  "after": {
+    "id": 111,
+    "name": "scooter",
+    "description": "Big 2-wheel scooter",
+    "weight": 5.15
+  },
+  "op_type": "U",
+  "op_ts": "2020-05-13 15:40:06.000000",
+  "current_ts": "2020-05-13 15:40:07.000000",
+  "primary_keys": [
+    "id"
+  ],
+  "pos": "00000000000000000000143",
+  "table": "PRODUCTS"
+}
+```
+
+*注意:请参考 [Debezium 
documentation](https://debezium.io/documentation/reference/1.3/connectors/mysql.html#mysql-connector-events_debezium)

Review comment:
       the link is for MySQL

##########
File path: docs/content.zh/docs/connectors/table/formats/ogg.md
##########
@@ -0,0 +1,256 @@
+---
+title: Ogg
+weight: 8
+type: docs
+aliases:
+- /dev/table/connectors/formats/ogg.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Ogg Format
+
+{{< label "Changelog-Data-Capture Format" >}} {{< label "Format: Serialization 
Schema" >}} {{<
+label "Format: Deserialization Schema" >}}
+
+[Oracle GoldenGate](https://www.oracle.com/integration/goldengate//) (a.k.a 
ogg) 是一个实现异构 IT 环境间数据实时数据集成和复制的综合软件包。
+该产品集支持高可用性解决方案、实时数据集成、事务更改数据捕获、运营和分析企业系统之间的数据复制、转换和验证。Ogg 
为变更日志提供了统一的格式结构,并支持使用 JSON 序列化消息。
+
+Flink 支持将 Ogg JSON 消息解析为 INSERT/UPDATE/DELETE 消息到 Flink SQL 
系统中。在很多情况下,利用这个特性非常有用,例如
+
+- 将增量数据从数据库同步到其他系统
+- 日志审计
+- 数据库的实时物化视图
+- 关联维度数据库的变更历史,等等
+
+Flink 还支持将 Flink SQL 中的 INSERT/UPDATE/DELETE 消息编码为 Ogg JSON 格式的消息, 输出到 Kafka 
等存储中。
+但需要注意, 目前 Flink 还不支持将 UPDATE_BEFORE 和 UPDATE_AFTER 合并为一条 UPDATE 消息. 因此, Flink 
将 UPDATE_BEFORE 和 UPDATE_AFTER
+分别编码为 DELETE 和 INSERT 类型的 Ogg 消息。
+
+Dependencies
+------------
+
+#### Ogg Json
+
+{{< sql_download_table "ogg-json" >}}
+
+*注意: 请参考 [Ogg Kafka Handler 
documentation](https://docs.oracle.com/en/middleware/goldengate/big-data/19.1/gadbd/using-kafka-handler.html),
+了解如何设置 Ogg Kafka handler 来将变更日志同步到 Kafka 的 Topic。*
+
+
+How to use Ogg format
+----------------
+
+Ogg 为变更日志提供了统一的格式, 这是一个 JSON 格式的从 Oracle `PRODUCTS` 表捕获的更新操作的简单示例:
+
+```json
+{
+  "before": {
+    "id": 111,
+    "name": "scooter",
+    "description": "Big 2-wheel scooter",
+    "weight": 5.18
+  },
+  "after": {
+    "id": 111,
+    "name": "scooter",
+    "description": "Big 2-wheel scooter",
+    "weight": 5.15
+  },
+  "op_type": "U",
+  "op_ts": "2020-05-13 15:40:06.000000",
+  "current_ts": "2020-05-13 15:40:07.000000",
+  "primary_keys": [
+    "id"
+  ],
+  "pos": "00000000000000000000143",
+  "table": "PRODUCTS"
+}
+```
+
+*注意:请参考 [Debezium 
documentation](https://debezium.io/documentation/reference/1.3/connectors/mysql.html#mysql-connector-events_debezium)
+了解每个字段的含义.*
+
+Oracle `PRODUCTS` 表 有 4 列 (`id`, `name`, `description` and `weight`). 上面的 JSON 
消息是 `PRODUCTS` 表上的一条更新事件,其中 `id = 111` 的行的
+`weight` 值从 `5.18` 更改为 `5.15`. 假设此消息已同步到 Kafka 的 Topic `products_ogg`, 则可以使用以下 
DDL 来使用该 Topic 并解析更新事件。
+
+```sql
+CREATE TABLE topic_products (
+  -- schema is totally the same to the MySQL "products" table
+  id BIGINT,
+  name STRING,
+  description STRING,
+  weight DECIMAL(10, 2)
+) WITH (
+  'connector' = 'kafka',
+  'topic' = 'products_ogg',
+  'properties.bootstrap.servers' = 'localhost:9092',
+  'properties.group.id' = 'testGroup',
+  'format' = 'ogg-json'
+)
+```
+
+再将 Kafka Topic 注册为 Flink 表之后, 可以将 OGG 消息变为变更日志源。
+
+```sql
+-- a real-time materialized view on the Oracle "PRODUCTS"
+-- which calculate the latest average of weight for the same products
+SELECT name, AVG(weight)
+FROM topic_products
+GROUP BY name;
+
+-- synchronize all the data and incremental changes of Oracle "PRODUCTS" table 
to
+-- Elasticsearch "products" index for future searching
+INSERT INTO elasticsearch_products
+SELECT *
+FROM topic_products;
+```
+
+Available Metadata
+------------------
+
+The following format metadata can be exposed as read-only (`VIRTUAL`) columns 
in a table definition.
+
+<span class="label label-danger">Attention</span> Format metadata fields are 
only available if the
+corresponding connector forwards format metadata. Currently, only the Kafka 
connector is able to
+expose metadata fields for its value format.
+
+<table class="table table-bordered">
+    <thead>
+    <tr>
+      <th class="text-left" style="width: 25%">Key</th>
+      <th class="text-center" style="width: 40%">Data Type</th>
+      <th class="text-center" style="width: 40%">Description</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><code>table</code></td>
+      <td><code>STRING NULL </code></td>
+      <td>Contains fully qualified table name. The format of the fully 
qualified table name is: 
+        CATALOG NAME.SCHEMA NAME.TABLE NAME</td>
+    </tr>
+    <tr>
+      <td><code>primary_keys</code></td>
+      <td><code>ARRAY&lt;STRING&gt; NULL</code></td>
+      <td>An array variable holding the column names of the primary keys of 
the source table. 
+        The primary_keys field is only include in the JSON output if the 
includePrimaryKeys 
+        configuration property is set to true.</td>
+    </tr>
+    <tr>
+      <td><code>ingestion-timestamp</code></td>
+      <td><code>TIMESTAMP_LTZ(6) NULL</code></td>
+      <td>The timestamp at which the connector processed the event. 
Corresponds to the current_ts field in the Ogg record.</td>
+    </tr>
+    <tr>
+      <td><code>event-timestamp</code></td>
+      <td><code>TIMESTAMP_LTZ(6) NULL</code></td>
+      <td>The timestamp at which the source system created the event. 
Corresponds to the op_ts field in the Ogg record.</td>
+    </tr>
+    </tbody>
+</table>
+
+The following example shows how to access Ogg metadata fields in Kafka:
+
+```sql
+CREATE TABLE KafkaTable (
+  origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
+  event_time TIMESTAMP(3) METADATA FROM 'value.event-timestamp' VIRTUAL,
+  origin_table STRING METADATA FROM 'value.table' VIRTUAL,
+  primary_keys ARRAY<STRING> METADATA FROM 'value.primary_keys' VIRTUAL,

Review comment:
       primary-keys

##########
File path: 
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/ogg/OggJsonDeserializationSchema.java
##########
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json.ogg;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
+import 
org.apache.flink.formats.json.ogg.OggJsonDecodingFormat.ReadableMetadata;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Collector;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static java.lang.String.format;
+
+/**
+ * Deserialization schema from Ogg JSON to Flink Table/SQL internal data 
structure {@link RowData}.
+ * The deserialization schema knows Ogg's schema definition and can extract 
the database data and
+ * convert into {@link RowData} with {@link RowKind}.
+ *
+ * <p>Deserializes a <code>byte[]</code> message as a JSON object and reads 
the specified fields.
+ *
+ * <p>Failures during deserialization are forwarded as wrapped IOExceptions.
+ *
+ * @see <a 
href="https://www.oracle.com/cn/middleware/technologies/goldengate/overview.html";>Ogg</a>
+ */
+@Internal
+public final class OggJsonDeserializationSchema implements 
DeserializationSchema<RowData> {
+    private static final long serialVersionUID = 1L;
+
+    private static final String OP_CREATE = "I"; // insert
+    private static final String OP_UPDATE = "U"; // update
+    private static final String OP_DELETE = "D"; // delete
+    private static final String OP_TRUNCATE = "T"; // truncate
+
+    private static final String REPLICA_IDENTITY_EXCEPTION =
+            "The \"before\" field of %s message is null, "
+                    + "if you are using Ogg Postgres Connector, "
+                    + "please check the Postgres table has been set REPLICA 
IDENTITY to FULL level.";
+
+    /** The deserializer to deserialize Ogg JSON data. */
+    private final JsonRowDataDeserializationSchema jsonDeserializer;
+
+    /** Flag that indicates that an additional projection is required for 
metadata. */
+    private final boolean hasMetadata;
+
+    /** Metadata to be extracted for every record. */
+    private final MetadataConverter[] metadataConverters;
+
+    /** {@link TypeInformation} of the produced {@link RowData} (physical + 
metadata). */
+    private final TypeInformation<RowData> producedTypeInfo;
+
+    /** Flag indicating whether to ignore invalid fields/rows (default: throw 
an exception). */
+    private final boolean ignoreParseErrors;
+
+    public OggJsonDeserializationSchema(
+            DataType physicalDataType,
+            List<ReadableMetadata> requestedMetadata,
+            TypeInformation<RowData> producedTypeInfo,
+            boolean ignoreParseErrors,
+            TimestampFormat timestampFormat) {
+        final RowType jsonRowType = createJsonRowType(physicalDataType, 
requestedMetadata);
+        this.jsonDeserializer =
+                new JsonRowDataDeserializationSchema(
+                        jsonRowType,
+                        // the result type is never used, so it's fine to pass 
in the produced type
+                        // info
+                        producedTypeInfo,
+                        false, // ignoreParseErrors already contains the 
functionality of
+                        // failOnMissingField
+                        ignoreParseErrors,
+                        timestampFormat);
+        this.hasMetadata = requestedMetadata.size() > 0;
+        this.metadataConverters = createMetadataConverters(jsonRowType, 
requestedMetadata);
+        this.producedTypeInfo = producedTypeInfo;
+        this.ignoreParseErrors = ignoreParseErrors;
+    }
+
+    private static RowType createJsonRowType(
+            DataType physicalDataType, List<ReadableMetadata> 
readableMetadata) {
+        DataType root =
+                DataTypes.ROW(
+                        DataTypes.FIELD("before", physicalDataType),
+                        DataTypes.FIELD("after", physicalDataType),
+                        DataTypes.FIELD("op_type", DataTypes.STRING()));
+        // append fields that are required for reading metadata in the root
+        final List<DataTypes.Field> rootMetadataFields =
+                readableMetadata.stream()
+                        .map(m -> m.requiredJsonField)
+                        .distinct()
+                        .collect(Collectors.toList());
+        return (RowType) DataTypeUtils.appendRowFields(root, 
rootMetadataFields).getLogicalType();
+    }
+
+    private static MetadataConverter[] createMetadataConverters(
+            RowType jsonRowType, List<ReadableMetadata> requestedMetadata) {
+        return requestedMetadata.stream()
+                .map(m -> convertInRoot(jsonRowType, m))
+                .toArray(MetadataConverter[]::new);
+    }
+
+    private static MetadataConverter convertInRoot(RowType jsonRowType, 
ReadableMetadata metadata) {
+        final int pos = findFieldPos(metadata, jsonRowType);
+        return new MetadataConverter() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(GenericRowData root, int unused) {
+                return metadata.converter.convert(root, pos);
+            }
+        };
+    }
+
+    private static int findFieldPos(ReadableMetadata metadata, RowType 
jsonRowType) {
+        return 
jsonRowType.getFieldNames().indexOf(metadata.requiredJsonField.getName());
+    }
+
+    @Override
+    public RowData deserialize(byte[] message) {
+        throw new RuntimeException(
+                "Please invoke DeserializationSchema#deserialize(byte[], 
Collector<RowData>) instead.");
+    }
+
+    @Override
+    public void deserialize(byte[] message, Collector<RowData> out) throws 
IOException {
+        if (message == null || message.length == 0) {
+            // skip tombstone messages
+            return;
+        }
+        try {
+            GenericRowData row = (GenericRowData) 
jsonDeserializer.deserialize(message);
+
+            GenericRowData before = (GenericRowData) row.getField(0);
+            GenericRowData after = (GenericRowData) row.getField(1);
+            String op = row.getField(2).toString();
+            if (OP_CREATE.equals(op)) {
+                after.setRowKind(RowKind.INSERT);
+                emitRow(row, after, out);
+            } else if (OP_UPDATE.equals(op)) {
+                if (before == null) {
+                    throw new IllegalStateException(
+                            String.format(REPLICA_IDENTITY_EXCEPTION, 
"UPDATE"));
+                }
+                before.setRowKind(RowKind.UPDATE_BEFORE);
+                after.setRowKind(RowKind.UPDATE_AFTER);
+                emitRow(row, before, out);
+                emitRow(row, after, out);
+            } else if (OP_DELETE.equals(op)) {
+                if (before == null) {
+                    throw new IllegalStateException(
+                            String.format(REPLICA_IDENTITY_EXCEPTION, 
"DELETE"));
+                }
+                before.setRowKind(RowKind.DELETE);
+                emitRow(row, before, out);
+            } else if (OP_TRUNCATE.equals(op)) {
+                // flink can't interpret such type of record, skip the record

Review comment:
       print a WARN LOG ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to