This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 5f529bfb7b [INLONG-8745][Sort] Add incremental and PostGre type in 
postgre connector (#8746)
5f529bfb7b is described below

commit 5f529bfb7bebc24a48e53ea7d8f2c69d3df7b943
Author: Sting <[email protected]>
AuthorDate: Thu Aug 17 10:17:53 2023 +0800

    [INLONG-8745][Sort] Add incremental and PostGre type in postgre connector 
(#8746)
---
 .../inlong/sort/cdc/base/util/MetaDataUtil.java    | 69 ++++++++++++++++++++++
 .../inlong/sort/cdc/mysql/utils/MetaDataUtils.java | 31 +---------
 .../cdc/oracle/table/OracleReadableMetaData.java   | 36 ++---------
 .../postgres/table/PostgreSQLReadableMetaData.java |  9 +++
 .../inlong/sort/formats/json/canal/CanalJson.java  |  6 +-
 5 files changed, 90 insertions(+), 61 deletions(-)

diff --git 
a/inlong-sort/sort-flink/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/util/MetaDataUtil.java
 
b/inlong-sort/sort-flink/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/util/MetaDataUtil.java
new file mode 100644
index 0000000000..247fd401fd
--- /dev/null
+++ 
b/inlong-sort/sort-flink/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/util/MetaDataUtil.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.base.util;
+
+import io.debezium.relational.Column;
+import io.debezium.relational.Table;
+import io.debezium.relational.history.TableChanges;
+
+import javax.annotation.Nullable;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * This class handles the metadata of a cdc connector
+ */
+public class MetaDataUtil {
+
+    private static final String FORMAT_PRECISION = "%s(%d)";
+    private static final String FORMAT_PRECISION_SCALE = "%s(%d, %d)";
+    private static final String REGEX_FORMATTED = "\\w.+\\([\\d ,]+\\)";
+
+    /**
+     * get a map about column name and type
+     * @param tableSchema
+     * @return map of field name and field type
+     */
+    public static Map<String, String> getType(@Nullable 
TableChanges.TableChange tableSchema) {
+        if (tableSchema == null) {
+            return null;
+        }
+        Map<String, String> oracleType = new LinkedHashMap<>();
+        final Table table = tableSchema.getTable();
+        for (Column column : table.columns()) {
+            // The typeName contains precision and does not need to be 
formatted.
+            if (column.typeName().matches(REGEX_FORMATTED)) {
+                oracleType.put(column.name(), column.typeName());
+                continue;
+            }
+            if (column.scale().isPresent()) {
+                oracleType.put(
+                        column.name(),
+                        String.format(FORMAT_PRECISION_SCALE,
+                                column.typeName(), column.length(), 
column.scale().get()));
+            } else {
+                oracleType.put(
+                        column.name(),
+                        String.format(FORMAT_PRECISION, column.typeName(), 
column.length()));
+            }
+        }
+        return oracleType;
+    }
+
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/utils/MetaDataUtils.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/utils/MetaDataUtils.java
index 4d6c5e8aff..a927088e9f 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/utils/MetaDataUtils.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/utils/MetaDataUtils.java
@@ -45,6 +45,7 @@ import java.util.List;
 import java.util.Map;
 
 import static org.apache.inlong.sort.base.Constants.DDL_FIELD_NAME;
+import static org.apache.inlong.sort.cdc.base.util.MetaDataUtil.getType;
 import static 
org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.isSnapshotRecord;
 import static 
org.apache.inlong.sort.cdc.mysql.utils.OperationUtils.generateOperation;
 
@@ -57,9 +58,6 @@ public class MetaDataUtils {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(MetaDataUtils.class);
 
-    private static final String FORMAT_PRECISION = "%s(%d)";
-    private static final String FORMAT_PRECISION_SCALE = "%s(%d, %d)";
-
     /**
      * get sql type from table schema, represents the jdbc data type
      *
@@ -138,29 +136,6 @@ public class MetaDataUtils {
         return tableSchema.getTable().primaryKeyColumnNames();
     }
 
-    public static Map<String, String> getMysqlType(@Nullable 
TableChanges.TableChange tableSchema) {
-        if (tableSchema == null) {
-            return null;
-        }
-        Map<String, String> mysqlType = new LinkedHashMap<>();
-        final Table table = tableSchema.getTable();
-        table.columns()
-                .forEach(
-                        column -> {
-                            if (column.scale().isPresent()) {
-                                mysqlType.put(
-                                        column.name(),
-                                        String.format(FORMAT_PRECISION_SCALE,
-                                                column.typeName(), 
column.length(), column.scale().get()));
-                            } else {
-                                mysqlType.put(
-                                        column.name(),
-                                        String.format(FORMAT_PRECISION, 
column.typeName(), column.length()));
-                            }
-                        });
-        return mysqlType;
-    }
-
     public static StringData getCanalData(SourceRecord record, GenericRowData 
rowData,
             TableChange tableSchema) {
         Struct messageStruct = (Struct) record.value();
@@ -178,7 +153,7 @@ public class MetaDataUtils {
         CanalJson canalJson = CanalJson.builder()
                 .database(databaseName)
                 .es(opTs).pkNames(getPkNames(tableSchema))
-                .mysqlType(getMysqlType(tableSchema)).table(tableName)
+                .mysqlType(getType(tableSchema)).table(tableName)
                 .incremental(!isSnapshotRecord(sourceStruct))
                 
.dataSourceName(sourceStruct.getString(AbstractSourceInfo.SERVER_NAME_KEY))
                 .type(getCanalOpType(rowData))
@@ -218,7 +193,7 @@ public class MetaDataUtils {
                 
.name(sourceStruct.getString(AbstractSourceInfo.SERVER_NAME_KEY))
                 .sqlType(getSqlType(tableSchema))
                 .pkNames(getPkNames(tableSchema))
-                .mysqlType(getMysqlType(tableSchema))
+                .mysqlType(getType(tableSchema))
                 .build();
         DebeziumJson debeziumJson = DebeziumJson.builder().source(source)
                 
.tsMs(sourceStruct.getInt64(AbstractSourceInfo.TIMESTAMP_KEY)).op(getDebeziumOpType(data))
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleReadableMetaData.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleReadableMetaData.java
index 2a40f0cda4..a87c4c33f4 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleReadableMetaData.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleReadableMetaData.java
@@ -23,7 +23,6 @@ import org.apache.inlong.sort.formats.json.canal.CanalJson;
 import io.debezium.connector.AbstractSourceInfo;
 import io.debezium.data.Envelope;
 import io.debezium.data.Envelope.FieldName;
-import io.debezium.relational.Column;
 import io.debezium.relational.Table;
 import io.debezium.relational.history.TableChanges;
 import io.debezium.relational.history.TableChanges.TableChange;
@@ -47,6 +46,8 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.inlong.sort.cdc.base.util.MetaDataUtil.getType;
+
 /** Defines the supported metadata columns for {@link OracleTableSource}. */
 public enum OracleReadableMetaData {
 
@@ -269,7 +270,7 @@ public enum OracleReadableMetaData {
                 @Override
                 public Object read(
                         SourceRecord record, @Nullable 
TableChanges.TableChange tableSchema) {
-                    Map<String, String> oracleType = 
getOracleType(tableSchema);
+                    Map<String, String> oracleType = getType(tableSchema);
                     if (oracleType == null) {
                         return null;
                     }
@@ -385,7 +386,7 @@ public enum OracleReadableMetaData {
         CanalJson canalJson = CanalJson.builder()
                 .data(dataList).database(databaseName).schema(schemaName)
                 .sql("").es(opTs).isDdl(false).pkNames(getPkNames(tableSchema))
-                .oracleType(getOracleType(tableSchema))
+                .oracleType(getType(tableSchema))
                 .table(tableName).ts(ts)
                 
.type(getCanalOpType(data)).sqlType(getSqlType(tableSchema)).build();
         try {
@@ -405,9 +406,6 @@ public enum OracleReadableMetaData {
     private static final String OP_INSERT = "INSERT";
     private static final String OP_DELETE = "DELETE";
     private static final String OP_UPDATE = "UPDATE";
-    private static final String REGEX_FORMATTED = "\\w.+\\([\\d ,]+\\)";
-    private static final String FORMAT_PRECISION = "%s(%d)";
-    private static final String FORMAT_PRECISION_SCALE = "%s(%d, %d)";
 
     OracleReadableMetaData(String key, DataType dataType, MetadataConverter 
converter) {
         this.key = key;
@@ -453,32 +451,6 @@ public enum OracleReadableMetaData {
         return tableSchema.getTable().primaryKeyColumnNames();
     }
 
-    public static Map<String, String> getOracleType(@Nullable 
TableChanges.TableChange tableSchema) {
-        if (tableSchema == null) {
-            return null;
-        }
-        Map<String, String> oracleType = new LinkedHashMap<>();
-        final Table table = tableSchema.getTable();
-        for (Column column : table.columns()) {
-            // The typeName contains precision and does not need to be 
formatted.
-            if (column.typeName().matches(REGEX_FORMATTED)) {
-                oracleType.put(column.name(), column.typeName());
-                continue;
-            }
-            if (column.scale().isPresent()) {
-                oracleType.put(
-                        column.name(),
-                        String.format(FORMAT_PRECISION_SCALE,
-                                column.typeName(), column.length(), 
column.scale().get()));
-            } else {
-                oracleType.put(
-                        column.name(),
-                        String.format(FORMAT_PRECISION, column.typeName(), 
column.length()));
-            }
-        }
-        return oracleType;
-    }
-
     public static Map<String, Integer> getSqlType(@Nullable 
TableChanges.TableChange tableSchema) {
         if (tableSchema == null) {
             return null;
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLReadableMetaData.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLReadableMetaData.java
index 383d899050..5831fccb63 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLReadableMetaData.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLReadableMetaData.java
@@ -23,6 +23,7 @@ import 
org.apache.inlong.sort.formats.json.debezium.DebeziumJson;
 import org.apache.inlong.sort.formats.json.debezium.DebeziumJson.Source;
 
 import io.debezium.connector.AbstractSourceInfo;
+import io.debezium.connector.SnapshotRecord;
 import io.debezium.data.Envelope;
 import io.debezium.data.Envelope.FieldName;
 import io.debezium.relational.Table;
@@ -48,6 +49,8 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.inlong.sort.cdc.base.util.MetaDataUtil.getType;
+
 /**
  * Defines the supported metadata columns for {@link PostgreSQLTableSource}.
  */
@@ -410,7 +413,9 @@ public enum PostgreSQLReadableMetaData {
                 .pkNames(getPkNames(tableSchema))
                 .table(tableName)
                 .ts(ts)
+                .postGreType(getType(tableSchema))
                 .type(getCanalOpType(rowData))
+                .incremental(isIncrementalRecord(sourceStruct))
                 .sqlType(getSqlType(tableSchema))
                 .build();
         try {
@@ -454,6 +459,10 @@ public enum PostgreSQLReadableMetaData {
         return opType;
     }
 
+    private static boolean isIncrementalRecord(Struct sourceStruct) {
+        return !(SnapshotRecord.TRUE == 
SnapshotRecord.fromSource(sourceStruct));
+    }
+
     /**
      * get primary key names
      *
diff --git 
a/inlong-sort/sort-formats/format-json-v1.13/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJson.java
 
b/inlong-sort/sort-formats/format-json-v1.13/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJson.java
index c0e74ce7a0..1e168a5240 100644
--- 
a/inlong-sort/sort-formats/format-json-v1.13/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJson.java
+++ 
b/inlong-sort/sort-formats/format-json-v1.13/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJson.java
@@ -70,6 +70,8 @@ public class CanalJson {
     private Boolean incremental;
     @JsonProperty("dataSourceName")
     private String dataSourceName;
+    @JsonProperty("postGreType")
+    private Map<String, String> postGreType;
 
     @JsonCreator
     public CanalJson(@Nullable @JsonProperty("data") List<Map<String, Object>> 
data,
@@ -87,7 +89,8 @@ public class CanalJson {
             @Nullable @JsonProperty("oracleType") Map<String, String> 
oracleType,
             @JsonProperty("operation") Operation operation,
             @JsonProperty("incremental") Boolean incremental,
-            @JsonProperty("dataSourceName") String dataSourceName) {
+            @JsonProperty("dataSourceName") String dataSourceName,
+            @Nullable @JsonProperty("postGreType") Map<String, String> 
postGreType) {
         this.data = data;
         this.es = es;
         this.table = table;
@@ -104,6 +107,7 @@ public class CanalJson {
         this.operation = operation;
         this.incremental = incremental;
         this.dataSourceName = dataSourceName;
+        this.postGreType = postGreType;
     }
 
 }

Reply via email to