This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 4de2b759f [INLONG-3778][Sort] FieldInfo enhanced to support transform 
(#3778)
4de2b759f is described below

commit 4de2b759f0119d4827c510cc15485315780fd6fe
Author: yunqingmoswu <[email protected]>
AuthorDate: Tue Apr 19 12:53:58 2022 +0800

    [INLONG-3778][Sort] FieldInfo enhanced to support transform (#3778)
---
 inlong-sort/sort-common/pom.xml                    |  8 +++-
 .../inlong/sort/protocol/BuiltInFieldInfo.java     | 37 +++++++++------
 .../org/apache/inlong/sort/protocol/FieldInfo.java | 55 +++++++++++++++-------
 .../protocol/transformation/FunctionParam.java}    | 34 +++++++------
 .../inlong/sort/protocol/BuiltInFieldInfoTest.java |  4 +-
 .../apache/inlong/sort/protocol/FieldInfoTest.java | 38 +++++++++++++--
 .../sort/protocol/kafka/KafkaSinkInfoTest.java     |  2 +-
 .../transformation/FieldMappingRuleTest.java       | 10 ++--
 .../transformation/TransformationInfoTest.java     | 10 ++--
 9 files changed, 135 insertions(+), 63 deletions(-)

diff --git a/inlong-sort/sort-common/pom.xml b/inlong-sort/sort-common/pom.xml
index a2a9e2b23..1eac4d0c4 100644
--- a/inlong-sort/sort-common/pom.xml
+++ b/inlong-sort/sort-common/pom.xml
@@ -18,8 +18,8 @@
     under the License.
 -->
 
-<project xmlns="http://maven.apache.org/POM/4.0.0";
-        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+        xmlns="http://maven.apache.org/POM/4.0.0";
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
     <modelVersion>4.0.0</modelVersion>
 
@@ -66,6 +66,10 @@
             <groupId>org.apache.curator</groupId>
             <artifactId>curator-framework</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+        </dependency>
 
     </dependencies>
 
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/BuiltInFieldInfo.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/BuiltInFieldInfo.java
index 60904ca34..d3cdc6460 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/BuiltInFieldInfo.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/BuiltInFieldInfo.java
@@ -26,31 +26,30 @@ import org.apache.inlong.sort.formats.common.FormatInfo;
  */
 public class BuiltInFieldInfo extends FieldInfo {
 
-    public enum BuiltInField {
-        DATA_TIME,
-        MYSQL_METADATA_DATABASE,
-        MYSQL_METADATA_TABLE,
-        MYSQL_METADATA_EVENT_TIME,
-        MYSQL_METADATA_IS_DDL,
-        MYSQL_METADATA_EVENT_TYPE,
-        MYSQL_METADATA_DATA
-    }
-
     private static final long serialVersionUID = -3436204467879205139L;
 
-    @JsonProperty("builtin_field")
+    @JsonProperty("builtinField")
     private final BuiltInField builtInField;
 
     @JsonCreator
     public BuiltInFieldInfo(
             @JsonProperty("name") String name,
-            @JsonProperty("format_info") FormatInfo formatInfo,
-            @JsonProperty("builtin_field") BuiltInField builtInField) {
+            @JsonProperty("nodeId") String nodeId,
+            @JsonProperty("formatInfo") FormatInfo formatInfo,
+            @JsonProperty("builtinField") BuiltInField builtInField) {
+        super(name, nodeId, formatInfo);
+        this.builtInField = builtInField;
+    }
+
+    public BuiltInFieldInfo(
+            @JsonProperty("name") String name,
+            @JsonProperty("formatInfo") FormatInfo formatInfo,
+            @JsonProperty("builtinField") BuiltInField builtInField) {
         super(name, formatInfo);
         this.builtInField = builtInField;
     }
 
-    @JsonProperty("builtin_field")
+    @JsonProperty("builtinField")
     public BuiltInField getBuiltInField() {
         return builtInField;
     }
@@ -70,4 +69,14 @@ public class BuiltInFieldInfo extends FieldInfo {
         return builtInField == that.builtInField
                 && super.equals(that);
     }
+
+    public enum BuiltInField {
+        DATA_TIME,
+        MYSQL_METADATA_DATABASE,
+        MYSQL_METADATA_TABLE,
+        MYSQL_METADATA_EVENT_TIME,
+        MYSQL_METADATA_IS_DDL,
+        MYSQL_METADATA_EVENT_TYPE,
+        MYSQL_METADATA_DATA
+    }
 }
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/FieldInfo.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/FieldInfo.java
index 7417d8c17..3f2e1e0d3 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/FieldInfo.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/FieldInfo.java
@@ -18,11 +18,17 @@
 package org.apache.inlong.sort.protocol;
 
 import com.google.common.base.Preconditions;
+import lombok.Data;
+import org.apache.commons.lang3.StringUtils;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSubTypes;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
 import org.apache.inlong.sort.formats.common.FormatInfo;
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.inlong.sort.protocol.transformation.FunctionParam;
 
 import java.io.Serializable;
 import java.util.Objects;
@@ -35,34 +41,50 @@ import java.util.Objects;
         @JsonSubTypes.Type(value = FieldInfo.class, name = "base"),
         @JsonSubTypes.Type(value = BuiltInFieldInfo.class, name = "builtin")
 })
-public class FieldInfo implements Serializable {
+@Data
+public class FieldInfo implements FunctionParam, Serializable {
 
     private static final long serialVersionUID = 5871970550803344673L;
-
     @JsonProperty("name")
     private final String name;
-
-    @JsonProperty("format_info")
+    @JsonInclude(Include.NON_NULL)
+    @JsonProperty("nodeId")
+    private String nodeId;
+    @JsonIgnore
+    private String tableNameAlias;
+    @JsonProperty("formatInfo")
     private FormatInfo formatInfo;
 
-    @JsonCreator
     public FieldInfo(
             @JsonProperty("name") String name,
-            @JsonProperty("format_info") FormatInfo formatInfo) {
+            @JsonProperty("formatInfo") FormatInfo formatInfo) {
         this.name = Preconditions.checkNotNull(name);
         this.formatInfo = Preconditions.checkNotNull(formatInfo);
     }
 
-    public String getName() {
-        return name;
-    }
-
-    public FormatInfo getFormatInfo() {
-        return formatInfo;
+    @JsonCreator
+    public FieldInfo(
+            @JsonProperty("name") String name,
+            @JsonProperty("nodeId") String nodeId,
+            @JsonProperty("formatInfo") FormatInfo formatInfo) {
+        this.name = Preconditions.checkNotNull(name);
+        this.nodeId = nodeId;
+        this.formatInfo = Preconditions.checkNotNull(formatInfo);
     }
 
-    public void setFormatInfo(FormatInfo formatInfo) {
-        this.formatInfo = formatInfo;
+    @Override
+    public String format() {
+        String formatName = name.trim();
+        if (!formatName.startsWith("`")) {
+            formatName = String.format("`%s", formatName);
+        }
+        if (!formatName.endsWith("`")) {
+            formatName = String.format("%s`", formatName);
+        }
+        if (StringUtils.isNotBlank(tableNameAlias)) {
+            return String.format("%s.%s", tableNameAlias, formatName);
+        }
+        return formatName;
     }
 
     @Override
@@ -81,4 +103,5 @@ public class FieldInfo implements Serializable {
     public int hashCode() {
         return Objects.hash(name, formatInfo);
     }
+
 }
diff --git 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/FieldInfoTest.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FunctionParam.java
similarity index 50%
copy from 
inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/FieldInfoTest.java
copy to 
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FunctionParam.java
index 7d5872189..849029ec5 100644
--- 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/FieldInfoTest.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FunctionParam.java
@@ -15,21 +15,27 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.protocol;
+package org.apache.inlong.sort.protocol.transformation;
 
-import static org.junit.Assert.assertEquals;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSubTypes;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.inlong.sort.protocol.BuiltInFieldInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
 
-import org.apache.inlong.sort.formats.common.StringFormatInfo;
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-import org.junit.Test;
+@JsonTypeInfo(
+        use = JsonTypeInfo.Id.NAME,
+        include = JsonTypeInfo.As.PROPERTY,
+        property = "type")
+@JsonSubTypes({
+        @JsonSubTypes.Type(value = FieldInfo.class, name = "base"),
+        @JsonSubTypes.Type(value = BuiltInFieldInfo.class, name = "builtin")
+})
+public interface FunctionParam {
+
+    @JsonIgnore
+    String getName();
+
+    String format();
 
-public class FieldInfoTest {
-    @Test
-    public void testSerialize() throws JsonProcessingException {
-        FieldInfo fieldInfo = new FieldInfo("field_name", 
StringFormatInfo.INSTANCE);
-        ObjectMapper objectMapper = new ObjectMapper();
-        String expected = 
"{\"type\":\"base\",\"name\":\"field_name\",\"format_info\":{\"type\":\"string\"}}";
-        assertEquals(expected, objectMapper.writeValueAsString(fieldInfo));
-    }
 }
diff --git 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/BuiltInFieldInfoTest.java
 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/BuiltInFieldInfoTest.java
index 2cc2267d8..2ec1b48a1 100644
--- 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/BuiltInFieldInfoTest.java
+++ 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/BuiltInFieldInfoTest.java
@@ -27,10 +27,10 @@ public class BuiltInFieldInfoTest extends ProtocolBaseTest {
         expectedJson = "{\n"
                 + "    \"type\":\"builtin\",\n"
                 + "    \"name\":\"f1\",\n"
-                + "    \"format_info\":{\n"
+                + "    \"formatInfo\":{\n"
                 + "        \"type\":\"string\"\n"
                 + "    },\n"
-                + "    \"builtin_field\":\"DATA_TIME\"\n"
+                + "    \"builtinField\":\"DATA_TIME\"\n"
                 + "}";
 
         equalObj1 = expectedObject;
diff --git 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/FieldInfoTest.java
 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/FieldInfoTest.java
index 7d5872189..6946bd3c8 100644
--- 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/FieldInfoTest.java
+++ 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/FieldInfoTest.java
@@ -17,19 +17,49 @@
 
 package org.apache.inlong.sort.protocol;
 
-import static org.junit.Assert.assertEquals;
-
-import org.apache.inlong.sort.formats.common.StringFormatInfo;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
+
 public class FieldInfoTest {
+
     @Test
     public void testSerialize() throws JsonProcessingException {
         FieldInfo fieldInfo = new FieldInfo("field_name", 
StringFormatInfo.INSTANCE);
         ObjectMapper objectMapper = new ObjectMapper();
-        String expected = 
"{\"type\":\"base\",\"name\":\"field_name\",\"format_info\":{\"type\":\"string\"}}";
+        String expected = 
"{\"type\":\"base\",\"name\":\"field_name\",\"formatInfo\":{\"type\":\"string\"}}";
+        assertEquals(expected, objectMapper.writeValueAsString(fieldInfo));
+    }
+
+    @Test
+    public void testDeserialize() throws JsonProcessingException {
+        FieldInfo fieldInfo = new FieldInfo("field_name", 
StringFormatInfo.INSTANCE);
+        ObjectMapper objectMapper = new ObjectMapper();
+        String fieldInfoStr = 
"{\"type\":\"base\",\"name\":\"field_name\",\"formatInfo\":{\"type\":\"string\"}}";
+        FieldInfo expected = objectMapper.readValue(fieldInfoStr, 
FieldInfo.class);
+        assertEquals(expected, fieldInfo);
+    }
+
+    @Test
+    public void testSerializeWithNodeId() throws JsonProcessingException {
+        FieldInfo fieldInfo = new FieldInfo("field_name", "1", 
StringFormatInfo.INSTANCE);
+        ObjectMapper objectMapper = new ObjectMapper();
+        String expected = "{\"type\":\"base\",\"name\":\"field_name\","
+                + "\"formatInfo\":{\"type\":\"string\"},\"nodeId\":\"1\"}";
         assertEquals(expected, objectMapper.writeValueAsString(fieldInfo));
     }
+
+    @Test
+    public void testDeserializeWithNodeId() throws JsonProcessingException {
+        FieldInfo fieldInfo = new FieldInfo("field_name", 
StringFormatInfo.INSTANCE);
+        fieldInfo.setNodeId("1L");
+        ObjectMapper objectMapper = new ObjectMapper();
+        String fieldInfoStr = "{\"type\":\"base\",\"name\":\"field_name\","
+                + "\"formatInfo\":{\"type\":\"string\"},\"nodeId\":\"1\"}";
+        FieldInfo expected = objectMapper.readValue(fieldInfoStr, 
FieldInfo.class);
+        assertEquals(expected, fieldInfo);
+    }
 }
diff --git 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/kafka/KafkaSinkInfoTest.java
 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/kafka/KafkaSinkInfoTest.java
index dfaacb0f3..8bfe2e229 100644
--- 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/kafka/KafkaSinkInfoTest.java
+++ 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/kafka/KafkaSinkInfoTest.java
@@ -41,7 +41,7 @@ public class KafkaSinkInfoTest extends ProtocolBaseTest {
                 + "        {\n"
                 + "            \"type\":\"base\",\n"
                 + "            \"name\":\"field1\",\n"
-                + "            \"format_info\":{\n"
+                + "            \"formatInfo\":{\n"
                 + "                \"type\":\"string\"\n"
                 + "            }\n"
                 + "        }\n"
diff --git 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/FieldMappingRuleTest.java
 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/FieldMappingRuleTest.java
index 90f4beb91..b3fe8d2fe 100644
--- 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/FieldMappingRuleTest.java
+++ 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/FieldMappingRuleTest.java
@@ -26,7 +26,7 @@ public class FieldMappingRuleTest extends ProtocolBaseTest {
 
     @Override
     public void init() {
-        expectedObject = new FieldMappingRule(new 
FieldMappingRule.FieldMappingUnit[] {
+        expectedObject = new FieldMappingRule(new 
FieldMappingRule.FieldMappingUnit[]{
                 new FieldMappingRule.FieldMappingUnit(
                         new FieldInfo("f1", StringFormatInfo.INSTANCE),
                         new FieldInfo("f2", DoubleFormatInfo.INSTANCE)
@@ -40,14 +40,14 @@ public class FieldMappingRuleTest extends ProtocolBaseTest {
                 + "            \"source_field\":{\n"
                 + "                \"type\":\"base\",\n"
                 + "                \"name\":\"f1\",\n"
-                + "                \"format_info\":{\n"
+                + "                \"formatInfo\":{\n"
                 + "                    \"type\":\"string\"\n"
                 + "                }\n"
                 + "            },\n"
                 + "            \"sink_field\":{\n"
                 + "                \"type\":\"base\",\n"
                 + "                \"name\":\"f2\",\n"
-                + "                \"format_info\":{\n"
+                + "                \"formatInfo\":{\n"
                 + "                    \"type\":\"double\"\n"
                 + "                }\n"
                 + "            }\n"
@@ -56,13 +56,13 @@ public class FieldMappingRuleTest extends ProtocolBaseTest {
                 + "}";
 
         equalObj1 = expectedObject;
-        equalObj2 = new FieldMappingRule(new 
FieldMappingRule.FieldMappingUnit[] {
+        equalObj2 = new FieldMappingRule(new 
FieldMappingRule.FieldMappingUnit[]{
                 new FieldMappingRule.FieldMappingUnit(
                         new FieldInfo("f1", StringFormatInfo.INSTANCE),
                         new FieldInfo("f2", DoubleFormatInfo.INSTANCE)
                 )
         });
-        unequalObj = new FieldMappingRule(new 
FieldMappingRule.FieldMappingUnit[] {
+        unequalObj = new FieldMappingRule(new 
FieldMappingRule.FieldMappingUnit[]{
                 new FieldMappingRule.FieldMappingUnit(
                         new FieldInfo("f1", StringFormatInfo.INSTANCE),
                         new FieldInfo("f3", DoubleFormatInfo.INSTANCE)
diff --git 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/TransformationInfoTest.java
 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/TransformationInfoTest.java
index 99ae6348b..007833900 100644
--- 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/TransformationInfoTest.java
+++ 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/TransformationInfoTest.java
@@ -27,7 +27,7 @@ public class TransformationInfoTest extends ProtocolBaseTest {
     @Override
     public void init() {
         expectedObject = new TransformationInfo(
-                new FieldMappingRule(new FieldMappingRule.FieldMappingUnit[] {
+                new FieldMappingRule(new FieldMappingRule.FieldMappingUnit[]{
                         new FieldMappingRule.FieldMappingUnit(
                                 new FieldInfo("f1", StringFormatInfo.INSTANCE),
                                 new FieldInfo("f2", DoubleFormatInfo.INSTANCE)
@@ -43,14 +43,14 @@ public class TransformationInfoTest extends 
ProtocolBaseTest {
                 + "                \"source_field\":{\n"
                 + "                    \"type\":\"base\",\n"
                 + "                    \"name\":\"f1\",\n"
-                + "                    \"format_info\":{\n"
+                + "                    \"formatInfo\":{\n"
                 + "                        \"type\":\"string\"\n"
                 + "                    }\n"
                 + "                },\n"
                 + "                \"sink_field\":{\n"
                 + "                    \"type\":\"base\",\n"
                 + "                    \"name\":\"f2\",\n"
-                + "                    \"format_info\":{\n"
+                + "                    \"formatInfo\":{\n"
                 + "                        \"type\":\"double\"\n"
                 + "                    }\n"
                 + "                }\n"
@@ -61,7 +61,7 @@ public class TransformationInfoTest extends ProtocolBaseTest {
 
         equalObj1 = expectedObject;
         equalObj2 = new TransformationInfo(
-                new FieldMappingRule(new FieldMappingRule.FieldMappingUnit[] {
+                new FieldMappingRule(new FieldMappingRule.FieldMappingUnit[]{
                         new FieldMappingRule.FieldMappingUnit(
                                 new FieldInfo("f1", StringFormatInfo.INSTANCE),
                                 new FieldInfo("f2", DoubleFormatInfo.INSTANCE)
@@ -69,7 +69,7 @@ public class TransformationInfoTest extends ProtocolBaseTest {
                 })
         );
         unequalObj = new TransformationInfo(
-                new FieldMappingRule(new FieldMappingRule.FieldMappingUnit[] {
+                new FieldMappingRule(new FieldMappingRule.FieldMappingUnit[]{
                         new FieldMappingRule.FieldMappingUnit(
                                 new FieldInfo("f1", StringFormatInfo.INSTANCE),
                                 new FieldInfo("f3", DoubleFormatInfo.INSTANCE)

Reply via email to