This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 4de2b759f [INLONG-3778][Sort] FieldInfo enhanced to support transform
(#3778)
4de2b759f is described below
commit 4de2b759f0119d4827c510cc15485315780fd6fe
Author: yunqingmoswu <[email protected]>
AuthorDate: Tue Apr 19 12:53:58 2022 +0800
[INLONG-3778][Sort] FieldInfo enhanced to support transform (#3778)
---
inlong-sort/sort-common/pom.xml | 8 +++-
.../inlong/sort/protocol/BuiltInFieldInfo.java | 37 +++++++++------
.../org/apache/inlong/sort/protocol/FieldInfo.java | 55 +++++++++++++++-------
.../protocol/transformation/FunctionParam.java} | 34 +++++++------
.../inlong/sort/protocol/BuiltInFieldInfoTest.java | 4 +-
.../apache/inlong/sort/protocol/FieldInfoTest.java | 38 +++++++++++++--
.../sort/protocol/kafka/KafkaSinkInfoTest.java | 2 +-
.../transformation/FieldMappingRuleTest.java | 10 ++--
.../transformation/TransformationInfoTest.java | 10 ++--
9 files changed, 135 insertions(+), 63 deletions(-)
diff --git a/inlong-sort/sort-common/pom.xml b/inlong-sort/sort-common/pom.xml
index a2a9e2b23..1eac4d0c4 100644
--- a/inlong-sort/sort-common/pom.xml
+++ b/inlong-sort/sort-common/pom.xml
@@ -18,8 +18,8 @@
under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
@@ -66,6 +66,10 @@
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.projectlombok</groupId>
+ <artifactId>lombok</artifactId>
+ </dependency>
</dependencies>
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/BuiltInFieldInfo.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/BuiltInFieldInfo.java
index 60904ca34..d3cdc6460 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/BuiltInFieldInfo.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/BuiltInFieldInfo.java
@@ -26,31 +26,30 @@ import org.apache.inlong.sort.formats.common.FormatInfo;
*/
public class BuiltInFieldInfo extends FieldInfo {
- public enum BuiltInField {
- DATA_TIME,
- MYSQL_METADATA_DATABASE,
- MYSQL_METADATA_TABLE,
- MYSQL_METADATA_EVENT_TIME,
- MYSQL_METADATA_IS_DDL,
- MYSQL_METADATA_EVENT_TYPE,
- MYSQL_METADATA_DATA
- }
-
private static final long serialVersionUID = -3436204467879205139L;
- @JsonProperty("builtin_field")
+ @JsonProperty("builtinField")
private final BuiltInField builtInField;
@JsonCreator
public BuiltInFieldInfo(
@JsonProperty("name") String name,
- @JsonProperty("format_info") FormatInfo formatInfo,
- @JsonProperty("builtin_field") BuiltInField builtInField) {
+ @JsonProperty("nodeId") String nodeId,
+ @JsonProperty("formatInfo") FormatInfo formatInfo,
+ @JsonProperty("builtinField") BuiltInField builtInField) {
+ super(name, nodeId, formatInfo);
+ this.builtInField = builtInField;
+ }
+
+ public BuiltInFieldInfo(
+ @JsonProperty("name") String name,
+ @JsonProperty("formatInfo") FormatInfo formatInfo,
+ @JsonProperty("builtinField") BuiltInField builtInField) {
super(name, formatInfo);
this.builtInField = builtInField;
}
- @JsonProperty("builtin_field")
+ @JsonProperty("builtinField")
public BuiltInField getBuiltInField() {
return builtInField;
}
@@ -70,4 +69,14 @@ public class BuiltInFieldInfo extends FieldInfo {
return builtInField == that.builtInField
&& super.equals(that);
}
+
+ public enum BuiltInField {
+ DATA_TIME,
+ MYSQL_METADATA_DATABASE,
+ MYSQL_METADATA_TABLE,
+ MYSQL_METADATA_EVENT_TIME,
+ MYSQL_METADATA_IS_DDL,
+ MYSQL_METADATA_EVENT_TYPE,
+ MYSQL_METADATA_DATA
+ }
}
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/FieldInfo.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/FieldInfo.java
index 7417d8c17..3f2e1e0d3 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/FieldInfo.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/FieldInfo.java
@@ -18,11 +18,17 @@
package org.apache.inlong.sort.protocol;
import com.google.common.base.Preconditions;
+import lombok.Data;
+import org.apache.commons.lang3.StringUtils;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSubTypes;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.apache.inlong.sort.formats.common.FormatInfo;
-import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
-import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.inlong.sort.protocol.transformation.FunctionParam;
import java.io.Serializable;
import java.util.Objects;
@@ -35,34 +41,50 @@ import java.util.Objects;
@JsonSubTypes.Type(value = FieldInfo.class, name = "base"),
@JsonSubTypes.Type(value = BuiltInFieldInfo.class, name = "builtin")
})
-public class FieldInfo implements Serializable {
+@Data
+public class FieldInfo implements FunctionParam, Serializable {
private static final long serialVersionUID = 5871970550803344673L;
-
@JsonProperty("name")
private final String name;
-
- @JsonProperty("format_info")
+ @JsonInclude(Include.NON_NULL)
+ @JsonProperty("nodeId")
+ private String nodeId;
+ @JsonIgnore
+ private String tableNameAlias;
+ @JsonProperty("formatInfo")
private FormatInfo formatInfo;
- @JsonCreator
public FieldInfo(
@JsonProperty("name") String name,
- @JsonProperty("format_info") FormatInfo formatInfo) {
+ @JsonProperty("formatInfo") FormatInfo formatInfo) {
this.name = Preconditions.checkNotNull(name);
this.formatInfo = Preconditions.checkNotNull(formatInfo);
}
- public String getName() {
- return name;
- }
-
- public FormatInfo getFormatInfo() {
- return formatInfo;
+ @JsonCreator
+ public FieldInfo(
+ @JsonProperty("name") String name,
+ @JsonProperty("nodeId") String nodeId,
+ @JsonProperty("formatInfo") FormatInfo formatInfo) {
+ this.name = Preconditions.checkNotNull(name);
+ this.nodeId = nodeId;
+ this.formatInfo = Preconditions.checkNotNull(formatInfo);
}
- public void setFormatInfo(FormatInfo formatInfo) {
- this.formatInfo = formatInfo;
+ @Override
+ public String format() {
+ String formatName = name.trim();
+ if (!formatName.startsWith("`")) {
+ formatName = String.format("`%s", formatName);
+ }
+ if (!formatName.endsWith("`")) {
+ formatName = String.format("%s`", formatName);
+ }
+ if (StringUtils.isNotBlank(tableNameAlias)) {
+ return String.format("%s.%s", tableNameAlias, formatName);
+ }
+ return formatName;
}
@Override
@@ -81,4 +103,5 @@ public class FieldInfo implements Serializable {
public int hashCode() {
return Objects.hash(name, formatInfo);
}
+
}
diff --git
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/FieldInfoTest.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FunctionParam.java
similarity index 50%
copy from
inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/FieldInfoTest.java
copy to
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FunctionParam.java
index 7d5872189..849029ec5 100644
---
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/FieldInfoTest.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FunctionParam.java
@@ -15,21 +15,27 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.protocol;
+package org.apache.inlong.sort.protocol.transformation;
-import static org.junit.Assert.assertEquals;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSubTypes;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.inlong.sort.protocol.BuiltInFieldInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
-import org.apache.inlong.sort.formats.common.StringFormatInfo;
-import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
-import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-import org.junit.Test;
+@JsonTypeInfo(
+ use = JsonTypeInfo.Id.NAME,
+ include = JsonTypeInfo.As.PROPERTY,
+ property = "type")
+@JsonSubTypes({
+ @JsonSubTypes.Type(value = FieldInfo.class, name = "base"),
+ @JsonSubTypes.Type(value = BuiltInFieldInfo.class, name = "builtin")
+})
+public interface FunctionParam {
+
+ @JsonIgnore
+ String getName();
+
+ String format();
-public class FieldInfoTest {
- @Test
- public void testSerialize() throws JsonProcessingException {
- FieldInfo fieldInfo = new FieldInfo("field_name",
StringFormatInfo.INSTANCE);
- ObjectMapper objectMapper = new ObjectMapper();
- String expected =
"{\"type\":\"base\",\"name\":\"field_name\",\"format_info\":{\"type\":\"string\"}}";
- assertEquals(expected, objectMapper.writeValueAsString(fieldInfo));
- }
}
diff --git
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/BuiltInFieldInfoTest.java
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/BuiltInFieldInfoTest.java
index 2cc2267d8..2ec1b48a1 100644
---
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/BuiltInFieldInfoTest.java
+++
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/BuiltInFieldInfoTest.java
@@ -27,10 +27,10 @@ public class BuiltInFieldInfoTest extends ProtocolBaseTest {
expectedJson = "{\n"
+ " \"type\":\"builtin\",\n"
+ " \"name\":\"f1\",\n"
- + " \"format_info\":{\n"
+ + " \"formatInfo\":{\n"
+ " \"type\":\"string\"\n"
+ " },\n"
- + " \"builtin_field\":\"DATA_TIME\"\n"
+ + " \"builtinField\":\"DATA_TIME\"\n"
+ "}";
equalObj1 = expectedObject;
diff --git
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/FieldInfoTest.java
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/FieldInfoTest.java
index 7d5872189..6946bd3c8 100644
---
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/FieldInfoTest.java
+++
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/FieldInfoTest.java
@@ -17,19 +17,49 @@
package org.apache.inlong.sort.protocol;
-import static org.junit.Assert.assertEquals;
-
-import org.apache.inlong.sort.formats.common.StringFormatInfo;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+
public class FieldInfoTest {
+
@Test
public void testSerialize() throws JsonProcessingException {
FieldInfo fieldInfo = new FieldInfo("field_name",
StringFormatInfo.INSTANCE);
ObjectMapper objectMapper = new ObjectMapper();
- String expected =
"{\"type\":\"base\",\"name\":\"field_name\",\"format_info\":{\"type\":\"string\"}}";
+ String expected =
"{\"type\":\"base\",\"name\":\"field_name\",\"formatInfo\":{\"type\":\"string\"}}";
+ assertEquals(expected, objectMapper.writeValueAsString(fieldInfo));
+ }
+
+ @Test
+ public void testDeserialize() throws JsonProcessingException {
+ FieldInfo fieldInfo = new FieldInfo("field_name",
StringFormatInfo.INSTANCE);
+ ObjectMapper objectMapper = new ObjectMapper();
+ String fieldInfoStr =
"{\"type\":\"base\",\"name\":\"field_name\",\"formatInfo\":{\"type\":\"string\"}}";
+ FieldInfo expected = objectMapper.readValue(fieldInfoStr,
FieldInfo.class);
+ assertEquals(expected, fieldInfo);
+ }
+
+ @Test
+ public void testSerializeWithNodeId() throws JsonProcessingException {
+ FieldInfo fieldInfo = new FieldInfo("field_name", "1",
StringFormatInfo.INSTANCE);
+ ObjectMapper objectMapper = new ObjectMapper();
+ String expected = "{\"type\":\"base\",\"name\":\"field_name\","
+ + "\"formatInfo\":{\"type\":\"string\"},\"nodeId\":\"1\"}";
assertEquals(expected, objectMapper.writeValueAsString(fieldInfo));
}
+
+ @Test
+ public void testDeserializeWithNodeId() throws JsonProcessingException {
+ FieldInfo fieldInfo = new FieldInfo("field_name",
StringFormatInfo.INSTANCE);
+ fieldInfo.setNodeId("1L");
+ ObjectMapper objectMapper = new ObjectMapper();
+ String fieldInfoStr = "{\"type\":\"base\",\"name\":\"field_name\","
+ + "\"formatInfo\":{\"type\":\"string\"},\"nodeId\":\"1\"}";
+ FieldInfo expected = objectMapper.readValue(fieldInfoStr,
FieldInfo.class);
+ assertEquals(expected, fieldInfo);
+ }
}
diff --git
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/kafka/KafkaSinkInfoTest.java
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/kafka/KafkaSinkInfoTest.java
index dfaacb0f3..8bfe2e229 100644
---
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/kafka/KafkaSinkInfoTest.java
+++
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/kafka/KafkaSinkInfoTest.java
@@ -41,7 +41,7 @@ public class KafkaSinkInfoTest extends ProtocolBaseTest {
+ " {\n"
+ " \"type\":\"base\",\n"
+ " \"name\":\"field1\",\n"
- + " \"format_info\":{\n"
+ + " \"formatInfo\":{\n"
+ " \"type\":\"string\"\n"
+ " }\n"
+ " }\n"
diff --git
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/FieldMappingRuleTest.java
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/FieldMappingRuleTest.java
index 90f4beb91..b3fe8d2fe 100644
---
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/FieldMappingRuleTest.java
+++
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/FieldMappingRuleTest.java
@@ -26,7 +26,7 @@ public class FieldMappingRuleTest extends ProtocolBaseTest {
@Override
public void init() {
- expectedObject = new FieldMappingRule(new
FieldMappingRule.FieldMappingUnit[] {
+ expectedObject = new FieldMappingRule(new
FieldMappingRule.FieldMappingUnit[]{
new FieldMappingRule.FieldMappingUnit(
new FieldInfo("f1", StringFormatInfo.INSTANCE),
new FieldInfo("f2", DoubleFormatInfo.INSTANCE)
@@ -40,14 +40,14 @@ public class FieldMappingRuleTest extends ProtocolBaseTest {
+ " \"source_field\":{\n"
+ " \"type\":\"base\",\n"
+ " \"name\":\"f1\",\n"
- + " \"format_info\":{\n"
+ + " \"formatInfo\":{\n"
+ " \"type\":\"string\"\n"
+ " }\n"
+ " },\n"
+ " \"sink_field\":{\n"
+ " \"type\":\"base\",\n"
+ " \"name\":\"f2\",\n"
- + " \"format_info\":{\n"
+ + " \"formatInfo\":{\n"
+ " \"type\":\"double\"\n"
+ " }\n"
+ " }\n"
@@ -56,13 +56,13 @@ public class FieldMappingRuleTest extends ProtocolBaseTest {
+ "}";
equalObj1 = expectedObject;
- equalObj2 = new FieldMappingRule(new
FieldMappingRule.FieldMappingUnit[] {
+ equalObj2 = new FieldMappingRule(new
FieldMappingRule.FieldMappingUnit[]{
new FieldMappingRule.FieldMappingUnit(
new FieldInfo("f1", StringFormatInfo.INSTANCE),
new FieldInfo("f2", DoubleFormatInfo.INSTANCE)
)
});
- unequalObj = new FieldMappingRule(new
FieldMappingRule.FieldMappingUnit[] {
+ unequalObj = new FieldMappingRule(new
FieldMappingRule.FieldMappingUnit[]{
new FieldMappingRule.FieldMappingUnit(
new FieldInfo("f1", StringFormatInfo.INSTANCE),
new FieldInfo("f3", DoubleFormatInfo.INSTANCE)
diff --git
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/TransformationInfoTest.java
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/TransformationInfoTest.java
index 99ae6348b..007833900 100644
---
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/TransformationInfoTest.java
+++
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/TransformationInfoTest.java
@@ -27,7 +27,7 @@ public class TransformationInfoTest extends ProtocolBaseTest {
@Override
public void init() {
expectedObject = new TransformationInfo(
- new FieldMappingRule(new FieldMappingRule.FieldMappingUnit[] {
+ new FieldMappingRule(new FieldMappingRule.FieldMappingUnit[]{
new FieldMappingRule.FieldMappingUnit(
new FieldInfo("f1", StringFormatInfo.INSTANCE),
new FieldInfo("f2", DoubleFormatInfo.INSTANCE)
@@ -43,14 +43,14 @@ public class TransformationInfoTest extends
ProtocolBaseTest {
+ " \"source_field\":{\n"
+ " \"type\":\"base\",\n"
+ " \"name\":\"f1\",\n"
- + " \"format_info\":{\n"
+ + " \"formatInfo\":{\n"
+ " \"type\":\"string\"\n"
+ " }\n"
+ " },\n"
+ " \"sink_field\":{\n"
+ " \"type\":\"base\",\n"
+ " \"name\":\"f2\",\n"
- + " \"format_info\":{\n"
+ + " \"formatInfo\":{\n"
+ " \"type\":\"double\"\n"
+ " }\n"
+ " }\n"
@@ -61,7 +61,7 @@ public class TransformationInfoTest extends ProtocolBaseTest {
equalObj1 = expectedObject;
equalObj2 = new TransformationInfo(
- new FieldMappingRule(new FieldMappingRule.FieldMappingUnit[] {
+ new FieldMappingRule(new FieldMappingRule.FieldMappingUnit[]{
new FieldMappingRule.FieldMappingUnit(
new FieldInfo("f1", StringFormatInfo.INSTANCE),
new FieldInfo("f2", DoubleFormatInfo.INSTANCE)
@@ -69,7 +69,7 @@ public class TransformationInfoTest extends ProtocolBaseTest {
})
);
unequalObj = new TransformationInfo(
- new FieldMappingRule(new FieldMappingRule.FieldMappingUnit[] {
+ new FieldMappingRule(new FieldMappingRule.FieldMappingUnit[]{
new FieldMappingRule.FieldMappingUnit(
new FieldInfo("f1", StringFormatInfo.INSTANCE),
new FieldInfo("f3", DoubleFormatInfo.INSTANCE)