This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 5f529bfb7b [INLONG-8745][Sort] Add incremental and PostGre type in
postgre connector (#8746)
5f529bfb7b is described below
commit 5f529bfb7bebc24a48e53ea7d8f2c69d3df7b943
Author: Sting <[email protected]>
AuthorDate: Thu Aug 17 10:17:53 2023 +0800
[INLONG-8745][Sort] Add incremental and PostGre type in postgre connector
(#8746)
---
.../inlong/sort/cdc/base/util/MetaDataUtil.java | 69 ++++++++++++++++++++++
.../inlong/sort/cdc/mysql/utils/MetaDataUtils.java | 31 +---------
.../cdc/oracle/table/OracleReadableMetaData.java | 36 ++---------
.../postgres/table/PostgreSQLReadableMetaData.java | 9 +++
.../inlong/sort/formats/json/canal/CanalJson.java | 6 +-
5 files changed, 90 insertions(+), 61 deletions(-)
diff --git
a/inlong-sort/sort-flink/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/util/MetaDataUtil.java
b/inlong-sort/sort-flink/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/util/MetaDataUtil.java
new file mode 100644
index 0000000000..247fd401fd
--- /dev/null
+++
b/inlong-sort/sort-flink/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/util/MetaDataUtil.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.base.util;
+
+import io.debezium.relational.Column;
+import io.debezium.relational.Table;
+import io.debezium.relational.history.TableChanges;
+
+import javax.annotation.Nullable;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * This class handles the metadata of a cdc connector
+ */
+public class MetaDataUtil {
+
+ private static final String FORMAT_PRECISION = "%s(%d)";
+ private static final String FORMAT_PRECISION_SCALE = "%s(%d, %d)";
+ private static final String REGEX_FORMATTED = "\\w.+\\([\\d ,]+\\)";
+
+ /**
+ * get a map about column name and type
+ * @param tableSchema
+ * @return map of field name and field type
+ */
+ public static Map<String, String> getType(@Nullable
TableChanges.TableChange tableSchema) {
+ if (tableSchema == null) {
+ return null;
+ }
+ Map<String, String> oracleType = new LinkedHashMap<>();
+ final Table table = tableSchema.getTable();
+ for (Column column : table.columns()) {
+ // The typeName contains precision and does not need to be
formatted.
+ if (column.typeName().matches(REGEX_FORMATTED)) {
+ oracleType.put(column.name(), column.typeName());
+ continue;
+ }
+ if (column.scale().isPresent()) {
+ oracleType.put(
+ column.name(),
+ String.format(FORMAT_PRECISION_SCALE,
+ column.typeName(), column.length(),
column.scale().get()));
+ } else {
+ oracleType.put(
+ column.name(),
+ String.format(FORMAT_PRECISION, column.typeName(),
column.length()));
+ }
+ }
+ return oracleType;
+ }
+
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/utils/MetaDataUtils.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/utils/MetaDataUtils.java
index 4d6c5e8aff..a927088e9f 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/utils/MetaDataUtils.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/utils/MetaDataUtils.java
@@ -45,6 +45,7 @@ import java.util.List;
import java.util.Map;
import static org.apache.inlong.sort.base.Constants.DDL_FIELD_NAME;
+import static org.apache.inlong.sort.cdc.base.util.MetaDataUtil.getType;
import static
org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.isSnapshotRecord;
import static
org.apache.inlong.sort.cdc.mysql.utils.OperationUtils.generateOperation;
@@ -57,9 +58,6 @@ public class MetaDataUtils {
private static final Logger LOG =
LoggerFactory.getLogger(MetaDataUtils.class);
- private static final String FORMAT_PRECISION = "%s(%d)";
- private static final String FORMAT_PRECISION_SCALE = "%s(%d, %d)";
-
/**
* get sql type from table schema, represents the jdbc data type
*
@@ -138,29 +136,6 @@ public class MetaDataUtils {
return tableSchema.getTable().primaryKeyColumnNames();
}
- public static Map<String, String> getMysqlType(@Nullable
TableChanges.TableChange tableSchema) {
- if (tableSchema == null) {
- return null;
- }
- Map<String, String> mysqlType = new LinkedHashMap<>();
- final Table table = tableSchema.getTable();
- table.columns()
- .forEach(
- column -> {
- if (column.scale().isPresent()) {
- mysqlType.put(
- column.name(),
- String.format(FORMAT_PRECISION_SCALE,
- column.typeName(),
column.length(), column.scale().get()));
- } else {
- mysqlType.put(
- column.name(),
- String.format(FORMAT_PRECISION,
column.typeName(), column.length()));
- }
- });
- return mysqlType;
- }
-
public static StringData getCanalData(SourceRecord record, GenericRowData
rowData,
TableChange tableSchema) {
Struct messageStruct = (Struct) record.value();
@@ -178,7 +153,7 @@ public class MetaDataUtils {
CanalJson canalJson = CanalJson.builder()
.database(databaseName)
.es(opTs).pkNames(getPkNames(tableSchema))
- .mysqlType(getMysqlType(tableSchema)).table(tableName)
+ .mysqlType(getType(tableSchema)).table(tableName)
.incremental(!isSnapshotRecord(sourceStruct))
.dataSourceName(sourceStruct.getString(AbstractSourceInfo.SERVER_NAME_KEY))
.type(getCanalOpType(rowData))
@@ -218,7 +193,7 @@ public class MetaDataUtils {
.name(sourceStruct.getString(AbstractSourceInfo.SERVER_NAME_KEY))
.sqlType(getSqlType(tableSchema))
.pkNames(getPkNames(tableSchema))
- .mysqlType(getMysqlType(tableSchema))
+ .mysqlType(getType(tableSchema))
.build();
DebeziumJson debeziumJson = DebeziumJson.builder().source(source)
.tsMs(sourceStruct.getInt64(AbstractSourceInfo.TIMESTAMP_KEY)).op(getDebeziumOpType(data))
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleReadableMetaData.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleReadableMetaData.java
index 2a40f0cda4..a87c4c33f4 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleReadableMetaData.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleReadableMetaData.java
@@ -23,7 +23,6 @@ import org.apache.inlong.sort.formats.json.canal.CanalJson;
import io.debezium.connector.AbstractSourceInfo;
import io.debezium.data.Envelope;
import io.debezium.data.Envelope.FieldName;
-import io.debezium.relational.Column;
import io.debezium.relational.Table;
import io.debezium.relational.history.TableChanges;
import io.debezium.relational.history.TableChanges.TableChange;
@@ -47,6 +46,8 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import static org.apache.inlong.sort.cdc.base.util.MetaDataUtil.getType;
+
/** Defines the supported metadata columns for {@link OracleTableSource}. */
public enum OracleReadableMetaData {
@@ -269,7 +270,7 @@ public enum OracleReadableMetaData {
@Override
public Object read(
SourceRecord record, @Nullable
TableChanges.TableChange tableSchema) {
- Map<String, String> oracleType =
getOracleType(tableSchema);
+ Map<String, String> oracleType = getType(tableSchema);
if (oracleType == null) {
return null;
}
@@ -385,7 +386,7 @@ public enum OracleReadableMetaData {
CanalJson canalJson = CanalJson.builder()
.data(dataList).database(databaseName).schema(schemaName)
.sql("").es(opTs).isDdl(false).pkNames(getPkNames(tableSchema))
- .oracleType(getOracleType(tableSchema))
+ .oracleType(getType(tableSchema))
.table(tableName).ts(ts)
.type(getCanalOpType(data)).sqlType(getSqlType(tableSchema)).build();
try {
@@ -405,9 +406,6 @@ public enum OracleReadableMetaData {
private static final String OP_INSERT = "INSERT";
private static final String OP_DELETE = "DELETE";
private static final String OP_UPDATE = "UPDATE";
- private static final String REGEX_FORMATTED = "\\w.+\\([\\d ,]+\\)";
- private static final String FORMAT_PRECISION = "%s(%d)";
- private static final String FORMAT_PRECISION_SCALE = "%s(%d, %d)";
OracleReadableMetaData(String key, DataType dataType, MetadataConverter
converter) {
this.key = key;
@@ -453,32 +451,6 @@ public enum OracleReadableMetaData {
return tableSchema.getTable().primaryKeyColumnNames();
}
- public static Map<String, String> getOracleType(@Nullable
TableChanges.TableChange tableSchema) {
- if (tableSchema == null) {
- return null;
- }
- Map<String, String> oracleType = new LinkedHashMap<>();
- final Table table = tableSchema.getTable();
- for (Column column : table.columns()) {
- // The typeName contains precision and does not need to be
formatted.
- if (column.typeName().matches(REGEX_FORMATTED)) {
- oracleType.put(column.name(), column.typeName());
- continue;
- }
- if (column.scale().isPresent()) {
- oracleType.put(
- column.name(),
- String.format(FORMAT_PRECISION_SCALE,
- column.typeName(), column.length(),
column.scale().get()));
- } else {
- oracleType.put(
- column.name(),
- String.format(FORMAT_PRECISION, column.typeName(),
column.length()));
- }
- }
- return oracleType;
- }
-
public static Map<String, Integer> getSqlType(@Nullable
TableChanges.TableChange tableSchema) {
if (tableSchema == null) {
return null;
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLReadableMetaData.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLReadableMetaData.java
index 383d899050..5831fccb63 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLReadableMetaData.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLReadableMetaData.java
@@ -23,6 +23,7 @@ import
org.apache.inlong.sort.formats.json.debezium.DebeziumJson;
import org.apache.inlong.sort.formats.json.debezium.DebeziumJson.Source;
import io.debezium.connector.AbstractSourceInfo;
+import io.debezium.connector.SnapshotRecord;
import io.debezium.data.Envelope;
import io.debezium.data.Envelope.FieldName;
import io.debezium.relational.Table;
@@ -48,6 +49,8 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import static org.apache.inlong.sort.cdc.base.util.MetaDataUtil.getType;
+
/**
* Defines the supported metadata columns for {@link PostgreSQLTableSource}.
*/
@@ -410,7 +413,9 @@ public enum PostgreSQLReadableMetaData {
.pkNames(getPkNames(tableSchema))
.table(tableName)
.ts(ts)
+ .postGreType(getType(tableSchema))
.type(getCanalOpType(rowData))
+ .incremental(isIncrementalRecord(sourceStruct))
.sqlType(getSqlType(tableSchema))
.build();
try {
@@ -454,6 +459,10 @@ public enum PostgreSQLReadableMetaData {
return opType;
}
+ private static boolean isIncrementalRecord(Struct sourceStruct) {
+ return !(SnapshotRecord.TRUE ==
SnapshotRecord.fromSource(sourceStruct));
+ }
+
/**
* get primary key names
*
diff --git
a/inlong-sort/sort-formats/format-json-v1.13/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJson.java
b/inlong-sort/sort-formats/format-json-v1.13/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJson.java
index c0e74ce7a0..1e168a5240 100644
---
a/inlong-sort/sort-formats/format-json-v1.13/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJson.java
+++
b/inlong-sort/sort-formats/format-json-v1.13/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJson.java
@@ -70,6 +70,8 @@ public class CanalJson {
private Boolean incremental;
@JsonProperty("dataSourceName")
private String dataSourceName;
+ @JsonProperty("postGreType")
+ private Map<String, String> postGreType;
@JsonCreator
public CanalJson(@Nullable @JsonProperty("data") List<Map<String, Object>>
data,
@@ -87,7 +89,8 @@ public class CanalJson {
@Nullable @JsonProperty("oracleType") Map<String, String>
oracleType,
@JsonProperty("operation") Operation operation,
@JsonProperty("incremental") Boolean incremental,
- @JsonProperty("dataSourceName") String dataSourceName) {
+ @JsonProperty("dataSourceName") String dataSourceName,
+ @Nullable @JsonProperty("postGreType") Map<String, String>
postGreType) {
this.data = data;
this.es = es;
this.table = table;
@@ -104,6 +107,7 @@ public class CanalJson {
this.operation = operation;
this.incremental = incremental;
this.dataSourceName = dataSourceName;
+ this.postGreType = postGreType;
}
}