This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new be28a0789 [INLONG-6326][Sort] Fix the incorrect log type in the all
migrate converter (#6328)
be28a0789 is described below
commit be28a07899a1e4a9f80e6128acfae6bdb9973bba
Author: Schnapps <[email protected]>
AuthorDate: Mon Oct 31 13:24:00 2022 +0800
[INLONG-6326][Sort] Fix the incorrect log type in the all migrate converter
(#6328)
---
.../debezium/table/RowDataDebeziumDeserializeSchema.java | 15 ++++++++-------
1 file changed, 8 insertions(+), 7 deletions(-)
diff --git
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java
index 6414620c5..204199598 100644
---
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java
+++
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java
@@ -596,9 +596,7 @@ public final class RowDataDebeziumDeserializeSchema
Schema fieldSchema = schema.field(fieldName).schema();
String schemaName = fieldSchema.name();
if (schemaName != null) {
- // normal type doesn't have schema name
- // schema names are time schemas
- fieldValue = getTimeValue(fieldValue, schemaName);
+ fieldValue = getValueWithSchema(fieldValue,
schemaName);
}
data.put(fieldName, fieldValue);
}
@@ -612,13 +610,13 @@ public final class RowDataDebeziumDeserializeSchema
}
/**
- * transform debezium time format to database format
+ * extract the data with the format provided by debezium
*
* @param fieldValue
* @param schemaName
- * @return
+ * @return the extracted data with schema
*/
- private Object getTimeValue(Object fieldValue, String schemaName) {
+ private Object getValueWithSchema(Object fieldValue, String schemaName) {
if (fieldValue == null) {
return null;
}
@@ -638,8 +636,11 @@ public final class RowDataDebeziumDeserializeSchema
fieldValue =
DateTimeFormatter.ISO_LOCAL_DATE_TIME.format(LocalDateTime.ofInstant(instantTime,
serverTimeZone));
break;
+ case Decimal.LOGICAL_NAME:
+ // no need to transfer decimal type since the value is already
decimal
+ break;
default:
- LOG.error("parse schema {} error", schemaName);
+ LOG.debug("schema {} is not being supported", schemaName);
}
return fieldValue;
}