This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 27f8c9cff [INLONG-6311][Sort] Fix missing fields error for the old 
Canal JSON data in multiple sink scenes (#6312)
27f8c9cff is described below

commit 27f8c9cffe4372e3248de79001fe028adba74986
Author: thesumery <[email protected]>
AuthorDate: Fri Oct 28 15:15:57 2022 +0800

    [INLONG-6311][Sort] Fix missing fields error for the old Canal JSON data in 
multiple sink scenes (#6312)
    
    Co-authored-by: thesumery <[email protected]>
---
 .../sort/base/format/JsonDynamicSchemaFormat.java  |  2 +-
 .../format/CanalJsonDynamicSchemaFormatTest.java   | 24 ++++++++++++++++++++++
 2 files changed, 25 insertions(+), 1 deletion(-)

diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java
index f49f01c0b..a0d564d0c 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java
@@ -99,7 +99,7 @@ public abstract class JsonDynamicSchemaFormat extends 
AbstractDynamicSchemaForma
 
     protected JsonDynamicSchemaFormat() {
         this.rowDataConverters =
-                new JsonToRowDataConverters(true, false, 
TimestampFormat.ISO_8601);
+                new JsonToRowDataConverters(false, false, 
TimestampFormat.ISO_8601);
     }
 
     /**
diff --git 
a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormatTest.java
 
b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormatTest.java
index ffa45e8c2..dc7ce5ec8 100644
--- 
a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormatTest.java
+++ 
b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormatTest.java
@@ -18,11 +18,16 @@
 package org.apache.inlong.sort.base.format;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryStringData;
+import org.apache.flink.types.RowKind;
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -96,6 +101,25 @@ public class CanalJsonDynamicSchemaFormatTest extends 
DynamicSchemaFormatBaseTes
         Assert.assertEquals(values, Collections.singletonList("111"));
     }
 
+    @Test
+    public void testExtractRowData() throws IOException {
+        JsonNode rootNode = (JsonNode) getDynamicSchemaFormat()
+                .deserialize(getSource().getBytes(StandardCharsets.UTF_8));
+        List<RowData> values = 
getDynamicSchemaFormat().extractRowData(rootNode);
+        List<RowData> rowDataList = new ArrayList<>();
+        rowDataList.add(GenericRowData.ofKind(RowKind.UPDATE_BEFORE,
+                111,
+                BinaryStringData.fromString("scooter"),
+                BinaryStringData.fromString("Big 2-wheel scooter"),
+                5.15f));
+        rowDataList.add(GenericRowData.ofKind(RowKind.UPDATE_AFTER,
+                111,
+                BinaryStringData.fromString("scooter"),
+                BinaryStringData.fromString("Big 2-wheel scooter"),
+                5.18f));
+        Assert.assertEquals(values, rowDataList);
+    }
+
     @SuppressWarnings({"unchecked", "rawtypes"})
     @Override
     protected AbstractDynamicSchemaFormat getDynamicSchemaFormat() {

Reply via email to