yuanoOo commented on code in PR #4061:
URL: https://github.com/apache/flink-cdc/pull/4061#discussion_r2224820535


##########
docs/content/docs/connectors/flink-sources/oceanbase-cdc.md:
##########
@@ -433,434 +498,429 @@ Available Metadata
 The following format metadata can be exposed as read-only (VIRTUAL) columns in 
a table definition.
 
 <table class="colwidths-auto docutils">
-    <thead>
-        <tr>
-            <th class="text-left" style="width: 15%">Key</th>
-            <th class="text-left" style="width: 30%">DataType</th>
-            <th class="text-left" style="width: 55%">Description</th>
-        </tr>
-    </thead>
-    <tbody>
-        <tr>
-            <td>tenant_name</td>
-            <td>STRING</td>
-            <td>Name of the tenant that contains the row.</td>
-        </tr>
-        <tr>
-            <td>database_name</td>
-            <td>STRING</td>
-            <td>Name of the database that contains the row.</td>
-        </tr>
-        <tr>
-            <td>schema_name</td>
-            <td>STRING</td>
-            <td>Name of the schema that contains the row.</td>
-        </tr>
-        <tr>
-            <td>table_name</td>
-            <td>STRING NOT NULL</td>
-            <td>Name of the table that contains the row.</td>
-        </tr>
-        <tr>
-            <td>op_ts</td>
-            <td>TIMESTAMP_LTZ(3) NOT NULL</td>
-            <td>It indicates the time that the change was made in the 
database. <br>
-                If the record is read from snapshot of the table instead of 
the change stream, the value is always 0.</td>
-        </tr>
-    </tbody>
+  <thead>
+     <tr>
+       <th class="text-left" style="width: 15%">Key</th>
+       <th class="text-left" style="width: 30%">DataType</th>
+       <th class="text-left" style="width: 55%">Description</th>
+     </tr>
+  </thead>
+  <tbody>
+    <tr>
+      <td>table_name</td>
+      <td>STRING NOT NULL</td>
+      <td>Name of the table that contain the row.</td>
+    </tr>
+    <tr>
+      <td>database_name</td>
+      <td>STRING NOT NULL</td>
+      <td>Name of the database that contain the row.</td>
+    </tr>
+    <tr>
+      <td>op_ts</td>
+      <td>TIMESTAMP_LTZ(3) NOT NULL</td>
+      <td>It indicates the time that the change was made in the database. 
<br>If the record is read from snapshot of the table instead of the binlog, the 
value is always 0.</td>
+    </tr>
+    <tr>
+      <td>row_kind</td>
+      <td>STRING NOT NULL</td>
+      <td>It indicates the row kind of the changelog,Note: The downstream SQL 
operator may fail to compare due to this new added column when processing the 
row retraction if 
+the source operator chooses to output the 'row_kind' column for each record. 
It is recommended to use this metadata column only in simple synchronization 
jobs.
+<br>'+I' means INSERT message, '-D' means DELETE message, '-U' means 
UPDATE_BEFORE message and '+U' means UPDATE_AFTER message.</td>
+    </tr>
+  </tbody>
 </table>
 
 The extended CREATE TABLE example demonstrates the syntax for exposing these 
metadata fields:
 
 ```sql
-CREATE TABLE products (
-    tenant_name STRING METADATA FROM 'tenant_name' VIRTUAL,
-    db_name STRING METADATA FROM 'database_name' VIRTUAL,
-    table_name STRING METADATA  FROM 'table_name' VIRTUAL,
-    operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
-    order_id INT,
-    order_date TIMESTAMP(0),
+CREATE TABLE products
+(
+    db_name       STRING METADATA FROM 'database_name' VIRTUAL,
+    table_name    STRING METADATA FROM 'table_name' VIRTUAL,
+    operation_ts  TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
+    operation     STRING METADATA FROM 'row_kind' VIRTUAL,
+    order_id      INT,
+    order_date    TIMESTAMP(0),
     customer_name STRING,
-    price DECIMAL(10, 5),
-    product_id INT,
-    order_status BOOLEAN,
-    PRIMARY KEY(order_id) NOT ENFORCED
+    price         DECIMAL(10, 5),
+    product_id    INT,
+    order_status  BOOLEAN,
+    PRIMARY KEY (order_id) NOT ENFORCED
 ) WITH (
-   'connector' = 'oceanbase-cdc',
-   'scan.startup.mode' = 'initial',
-   'username' = 'user@test_tenant',
-   'password' = 'pswd',
-   'tenant-name' = 'test_tenant',
-   'database-name' = '^test_db$',
-   'table-name' = '^orders$',
-   'hostname' = '127.0.0.1',
-   'port' = '2881',
-   'rootserver-list' = '127.0.0.1:2882:2881',
-   'logproxy.host' = '127.0.0.1',
-   'logproxy.port' = '2983',
-   'working-mode' = 'memory'
-);
+      'connector' = 'oceanbase-cdc',
+      'hostname' = 'localhost',
+      'port' = '2881',
+      'username' = 'root',
+      'password' = '123456',
+      'database-name' = 'mydb',
+      'table-name' = 'orders'
+      );
 ```
 
-Features
---------
-
-### At-Least-Once Processing
-
-The OceanBase CDC connector is a Flink Source connector which will read 
database snapshot first and then continues to read change events with 
**at-least-once processing**.
-
-OceanBase is a kind of distributed database whose log files are distributed on 
different servers. As there is no position information like MySQL binlog 
offset, we can only use timestamp as the position mark. In order to ensure the 
completeness of reading data, `liboblog` (a C++ library to read OceanBase log 
record) might read some log data before the given timestamp. So in this way we 
may read duplicate data whose timestamp is around the start point, and only 
'at-least-once' can be guaranteed.
-
-### Startup Reading Position
-
-The config option `scan.startup.mode` specifies the startup mode for OceanBase 
CDC consumer. The valid enumerations are:
-
-- `initial`: Performs an initial snapshot on the monitored table upon first 
startup, and continue to read the latest commit log.
-- `latest-offset`: Never to perform snapshot on the monitored table upon first 
startup and just read the latest commit log since the connector is started.
-- `timestamp`: Never to perform snapshot on the monitored table upon first 
startup and just read the commit log from the given `scan.startup.timestamp`.
-- `snapshot`: Only perform snapshot on the monitored table.
-
-### Consume Commit Log
+The extended CREATE TABLE example demonstrates the usage of regex to match 
multi-tables:
 
-The OceanBase CDC Connector using 
[oblogclient](https://github.com/oceanbase/oblogclient) to consume commit log 
from OceanBase LogProxy.
-
-### DataStream Source
-
-The OceanBase CDC connector can also be a DataStream source. You can create a 
SourceFunction as the following shows:
-
-```java
-import org.apache.flink.cdc.connectors.base.options.StartupOptions;
-import org.apache.flink.cdc.connectors.oceanbase.OceanBaseSource;
-import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-
-public class OceanBaseSourceExample {
-   public static void main(String[] args) throws Exception {
-      SourceFunction<String> oceanBaseSource =
-              OceanBaseSource.<String>builder()
-                      .startupOptions(StartupOptions.initial())
-                      .hostname("127.0.0.1")
-                      .port(2881)
-                      .username("user@test_tenant")
-                      .password("pswd")
-                      .compatibleMode("mysql")
-                      .jdbcDriver("com.mysql.cj.jdbc.Driver")
-                      .tenantName("test_tenant")
-                      .databaseName("^test_db$")
-                      .tableName("^test_table$")
-                      .logProxyHost("127.0.0.1")
-                      .logProxyPort(2983)
-                      .rsList("127.0.0.1:2882:2881")
-                      .serverTimeZone("+08:00")
-                      .deserializer(new JsonDebeziumDeserializationSchema())
-                      .build();
-
-      StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
-      // enable checkpoint
-      env.enableCheckpointing(3000);
-
-      env.addSource(oceanBaseSource).print().setParallelism(1);
-      env.execute("Print OceanBase Snapshot + Change Events");
-   }
-}
+```sql
+CREATE TABLE products
+(
+    db_name       STRING METADATA FROM 'database_name' VIRTUAL,
+    table_name    STRING METADATA FROM 'table_name' VIRTUAL,
+    operation_ts  TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
+    operation     STRING METADATA FROM 'row_kind' VIRTUAL,
+    order_id      INT,
+    order_date    TIMESTAMP(0),
+    customer_name STRING,
+    price         DECIMAL(10, 5),
+    product_id    INT,
+    order_status  BOOLEAN,
+    PRIMARY KEY (order_id) NOT ENFORCED
+) WITH (
+      'connector' = 'oceanbase-cdc',
+      'hostname' = 'localhost',
+      'port' = '2881',
+      'username' = 'root',
+      'password' = '123456',
+      'database-name' = '(^(test).*|^(tpc).*|txc|.*[p$]|t{2})',
+      'table-name' = '(t[5-8]|tt)'
+      );
 ```
+<table class="colwidths-auto docutils">
+  <thead>
+     <tr>
+       <th class="text-left" style="width: 15%">example</th>
+       <th class="text-left" style="width: 30%">expression</th>
+       <th class="text-left" style="width: 55%">description</th>
+     </tr>
+  </thead>
+  <tbody>
+    <tr>
+      <td>prefix match</td>
+      <td>^(test).*</td>
+      <td>This matches the database name or table name starts with prefix of 
test, e.g test1、test2. </td>
+    </tr>
+    <tr>
+      <td>suffix match</td>
+      <td>.*[p$]</td>
+      <td>This matches the database name or table name ends with suffix of p, 
e.g cdcp、edcp. </td>
+    </tr>
+    <tr>
+      <td>specific match</td>
+      <td>txc</td>
+      <td>This matches the database name or table name according to a specific 
name, e.g txc. </td>
+    </tr>
+  </tbody>
+</table>
 
-### Available Source metrics
-
-Metrics can help understand the progress of assignments, and the following are 
the supported [Flink 
metrics](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/):
-
-| Group                  | Name                       | Type  | Description    
                                     |
-|------------------------|----------------------------|-------|-----------------------------------------------------|
-| namespace.schema.table | isSnapshotting             | Gauge | Weather the 
table is snapshotting or not            |
-| namespace.schema.table | isStreamReading            | Gauge | Weather the 
table is stream reading or not          |
-| namespace.schema.table | numTablesSnapshotted       | Gauge | The number of 
tables that have been snapshotted     |
-| namespace.schema.table | numTablesRemaining         | Gauge | The number of 
tables that have not been snapshotted |
-| namespace.schema.table | numSnapshotSplitsProcessed | Gauge | The number of 
splits that is being processed        |
-| namespace.schema.table | numSnapshotSplitsRemaining | Gauge | The number of 
splits that have not been processed   |
-| namespace.schema.table | numSnapshotSplitsFinished  | Gauge | The number of 
splits that have been processed       |
-| namespace.schema.table | snapshotStartTime          | Gauge | The time when 
the snapshot started                  |
-| namespace.schema.table | snapshotEndTime            | Gauge | The time when 
the snapshot ended                    |
+It will use `database-name\\.table-name` as a pattern to match tables, as 
above examples using pattern 
`(^(test).*|^(tpc).*|txc|.*[p$]|t{2})\\.(t[5-8]|tt)` matches txc.tt、test2.test5.
 
-Notice:
-1. The group name is `namespace.schema.table`, where `namespace` is the actual 
database name, `schema` is the actual schema name, and `table` is the actual 
table name.
-2. For OceanBase, the `namespace` will be set to the default value "", and the 
group name will be like `test_database.test_table`.
 
 Data Type Mapping
 ----------------
 
-### Mysql Mode
-
 <div class="wy-table-responsive">
-    <table class="colwidths-auto docutils">
-        <thead>
-            <tr>
-                <th class="text-left">OceanBase type</th>
-                <th class="text-left">Flink SQL type</th>
-                <th class="text-left">NOTE</th>
-            </tr>
-        </thead>
-        <tbody>
-            <tr>
-                <td>BOOLEAN<br>
-                    TINYINT(1)<br>
-                    BIT(1)</td>
-                <td>BOOLEAN</td>
-                <td></td>
-            </tr>
-            <tr>
-                <td>TINYINT</td>
-                <td>TINYINT</td>
-                <td></td>
-            </tr>
-            <tr>
-                <td>
-                    SMALLINT<br>
-                    TINYINT UNSIGNED</td>
-                <td>SMALLINT</td>
-                <td></td>
-            </tr>
-            <tr>
-                <td>
-                    INT<br>
-                    MEDIUMINT<br>
-                    SMALLINT UNSIGNED</td>
-                <td>INT</td>
-                <td></td>
-            </tr>
-            <tr>
-                <td>
-                    BIGINT<br>
-                    INT UNSIGNED</td>
-                <td>BIGINT</td>
-                <td></td>
-            </tr>
-            <tr>
-                <td>BIGINT UNSIGNED</td>
-                <td>DECIMAL(20, 0)</td>
-                <td></td>
-            </tr>
-            <tr>
-                <td>
-                    REAL<br>
-                    FLOAT<br>
-                </td>
-                <td>FLOAT</td>
-                <td></td>
-            </tr>
-            <tr>
-                <td>
-                    DOUBLE
-                </td>
-                <td>DOUBLE</td>
-                <td></td>
-            </tr>
-            <tr>
-                <td>
-                    NUMERIC(p, s)<br>
-                    DECIMAL(p, s)<br>
-                    where p <= 38<br>
-                </td>
-                <td>DECIMAL(p, s)</td>
-                <td></td>
-            </tr>
-            <tr>
-                <td>
-                    NUMERIC(p, s)<br>
-                    DECIMAL(p, s)<br>
-                    where 38 < p <=65<br>
-                </td>
-                <td>STRING</td>
-                <td>DECIMAL is equivalent to NUMERIC. The precision for 
DECIMAL data type is up to 65 in OceanBase, but
-                    the precision for DECIMAL is limited to 38 in Flink.
-                    So if you define a decimal column whose precision is 
greater than 38, you should map it to STRING to
-                    avoid precision loss.</td>
-            </tr>
-            <tr>
-                <td>DATE</td>
-                <td>DATE</td>
-                <td></td>
-            </tr>
-            <tr>
-                <td>TIME [(p)]</td>
-                <td>TIME [(p)]</td>
-                <td></td>
-            </tr>
-            <tr>
-                <td>DATETIME [(p)]</td>
-                <td>TIMESTAMP [(p)]</td>
-                <td></td>
-            </tr>
-            <tr>
-                <td>TIMESTAMP [(p)]</td>
-                <td>TIMESTAMP_LTZ [(p)]</td>
-                <td></td>
-            </tr>
-            <tr>
-                <td>CHAR(n)</td>
-                <td>CHAR(n)</td>
-                <td></td>
-            </tr>
-            <tr>
-                <td>VARCHAR(n)</td>
-                <td>VARCHAR(n)</td>
-                <td></td>
-            </tr>
-            <tr>
-                <td>BIT(n)</td>
-                <td>BINARY(⌈(n + 7) / 8⌉)</td>
-                <td></td>
-            </tr>
-            <tr>
-                <td>BINARY(n)</td>
-                <td>BINARY(n)</td>
-                <td></td>
-            </tr>
-            <tr>
-                <td>VARBINARY(N)</td>
-                <td>VARBINARY(N)</td>
-                <td></td>
-            </tr>
-            <tr>
-                <td>
-                    TINYTEXT<br>
-                    TEXT<br>
-                    MEDIUMTEXT<br>
-                    LONGTEXT<br>
-                </td>
-                <td>STRING</td>
-                <td></td>
-            </tr>
-            <tr>
-                <td>
-                    TINYBLOB<br>
-                    BLOB<br>
-                    MEDIUMBLOB<br>
-                    LONGBLOB<br>
-                </td>
-                <td>BYTES</td>
-                <td></td>
-            </tr>
-            <tr>
-                <td>YEAR</td>
-                <td>INT</td>
-                <td></td>
-            </tr>
-            <tr>
-                <td>ENUM</td>
-                <td>STRING</td>
-                <td></td>
-            </tr>
-            <tr>
-                <td>SET</td>
-                <td>ARRAY&lt;STRING&gt;</td>
-                <td>As the SET data type in OceanBase is a string object that 
can have zero or more values, it should always be mapped to an array of 
string</td>
-            </tr>
-            <tr>
-                <td>JSON</td>
-                <td>STRING</td>
-                <td>The JSON data type  will be converted into STRING with 
JSON format in Flink.</td>
-            </tr>
-        </tbody>
-    </table>
+<table class="colwidths-auto docutils">
+    <thead>
+      <tr>
+        <th class="text-left" style="width:30%;">OceanBase type<a 
href="https://en.oceanbase.com/docs/common-oceanbase-database-10000000001974954";></a></th>
+        <th class="text-left" style="width:10%;">Flink SQL type<a href="{% 
link dev/table/types.md %}"></a></th>
+        <th class="text-left" style="width:60%;">NOTE</th>
+      </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td>TINYINT</td>
+      <td>TINYINT</td>
+      <td></td>
+    </tr>
+    <tr>
+      <td>
+        SMALLINT<br>
+        TINYINT UNSIGNED<br>
+        TINYINT UNSIGNED ZEROFILL
+      </td>
+      <td>SMALLINT</td>
+      <td></td>
+    </tr>
+    <tr>
+      <td>
+        INT<br>
+        MEDIUMINT<br>
+        SMALLINT UNSIGNED<br>
+        SMALLINT UNSIGNED ZEROFILL
+      </td>
+      <td>INT</td>
+      <td></td>
+    </tr>
+    <tr>
+      <td>
+        BIGINT<br>
+        INT UNSIGNED<br>
+        INT UNSIGNED ZEROFILL<br>
+        MEDIUMINT UNSIGNED<br>
+        MEDIUMINT UNSIGNED ZEROFILL
+      </td>
+      <td>BIGINT</td>
+      <td></td>
+    </tr>
+   <tr>
+      <td>
+        BIGINT UNSIGNED<br>
+        BIGINT UNSIGNED ZEROFILL<br>
+        SERIAL
+      </td>
+      <td>DECIMAL(20, 0)</td>
+      <td></td>
+    </tr>
+    <tr>
+      <td>
+        FLOAT<br>
+        FLOAT UNSIGNED<br>
+        FLOAT UNSIGNED ZEROFILL
+        </td>
+      <td>FLOAT</td>
+      <td></td>
+    </tr>
+    <tr>
+      <td>
+        REAL<br>
+        REAL UNSIGNED<br>
+        REAL UNSIGNED ZEROFILL<br>
+        DOUBLE<br>
+        DOUBLE UNSIGNED<br>
+        DOUBLE UNSIGNED ZEROFILL<br>
+        DOUBLE PRECISION<br>
+        DOUBLE PRECISION UNSIGNED<br>
+        DOUBLE PRECISION UNSIGNED ZEROFILL
+      </td>
+      <td>DOUBLE</td>
+      <td></td>
+    </tr>
+    <tr>
+      <td>
+        NUMERIC(p, s)<br>
+        NUMERIC(p, s) UNSIGNED<br>
+        NUMERIC(p, s) UNSIGNED ZEROFILL<br>
+        DECIMAL(p, s)<br>
+        DECIMAL(p, s) UNSIGNED<br>
+        DECIMAL(p, s) UNSIGNED ZEROFILL<br>
+        FIXED(p, s)<br>
+        FIXED(p, s) UNSIGNED<br>
+        FIXED(p, s) UNSIGNED ZEROFILL<br>
+        where p <= 38<br>
+      </td>
+      <td>DECIMAL(p, s)</td>
+      <td></td>
+    </tr>
+    <tr>
+      <td>
+        NUMERIC(p, s)<br>
+        NUMERIC(p, s) UNSIGNED<br>
+        NUMERIC(p, s) UNSIGNED ZEROFILL<br>
+        DECIMAL(p, s)<br>
+        DECIMAL(p, s) UNSIGNED<br>
+        DECIMAL(p, s) UNSIGNED ZEROFILL<br>
+        FIXED(p, s)<br>
+        FIXED(p, s) UNSIGNED<br>
+        FIXED(p, s) UNSIGNED ZEROFILL<br>
+        where 38 < p <= 65<br>
+      </td>
+      <td>STRING</td>
+      <td>The precision for DECIMAL data type is up to 65 in OceanBase, but 
the precision for DECIMAL is limited to 38 in Flink.
+  So if you define a decimal column whose precision is greater than 38, you 
should map it to STRING to avoid precision loss.</td>

Review Comment:
   This is copied from the mysql-cdc document. It is best to keep it consistent.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to