This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new bcfad5446 [INLONG-3826][Sort] Enhance field format to support varchar
types and timestamp of different precisions (#3984)
bcfad5446 is described below
commit bcfad5446a07bad01b4e565bfaa82a675adc15de
Author: yunqingmoswu <[email protected]>
AuthorDate: Thu Apr 28 10:04:23 2022 +0800
[INLONG-3826][Sort] Enhance field format to support varchar types and
timestamp of different precisions (#3984)
---
.../apache/inlong/sort/protocol/node/LoadNode.java | 10 +--
.../sort/protocol/node/load/HiveLoadNode.java | 29 ++++++--
.../apache/inlong/sort/protocol/GroupInfoTest.java | 28 ++++---
.../inlong/sort/protocol/StreamInfoTest.java | 45 ++++++------
.../protocol/node/{ => load}/HiveLoadNodeTest.java | 10 ++-
.../protocol/node/transform/DistinctNodeTest.java | 2 +-
.../transformation/WatermarkFieldTest.java | 2 +-
.../function/HopEndFunctionTest.java | 2 +-
.../transformation/function/HopFunctionTest.java | 2 +-
.../function/HopStartFunctionTest.java | 2 +-
.../function/MultiValueFilterFunctionTest.java | 2 +-
.../function/SessionEndFunctionTest.java | 2 +-
.../function/SessionFunctionTest.java | 2 +-
.../function/SessionStartFunctionTest.java | 2 +-
.../function/SingleValueFilterFunctionTest.java | 2 +-
.../function/TumbleEndFunctionTest.java | 2 +-
.../function/TumbleFunctionTest.java | 2 +-
.../function/TumbleStartFunctionTest.java | 2 +-
.../inlong/sort/formats/base/TableFormatUtils.java | 36 ++++-----
.../inlong/sort/formats/common/FormatInfo.java | 4 +-
.../common/LocalZonedTimestampFormatInfo.java | 21 +++++-
.../sort/formats/common/TimestampFormatInfo.java | 41 ++++++++---
.../sort/formats/common/VarCharFormatInfo.java | 85 ++++++++++++++++++++++
.../sort/formats/common/VarCharFormatInfoTest.java | 46 ++++++++++++
24 files changed, 290 insertions(+), 91 deletions(-)
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
index 94cc5202f..332700a33 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
@@ -18,10 +18,6 @@
package org.apache.inlong.sort.protocol.node;
import com.google.common.base.Preconditions;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import javax.annotation.Nullable;
import lombok.Data;
import lombok.NoArgsConstructor;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
@@ -36,6 +32,11 @@ import
org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
import org.apache.inlong.sort.protocol.transformation.FilterFunction;
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
@JsonTypeInfo(
use = JsonTypeInfo.Id.NAME,
include = JsonTypeInfo.As.PROPERTY,
@@ -88,5 +89,4 @@ public abstract class LoadNode implements Node {
this.sinkParallelism = sinkParallelism;
this.properties = properties;
}
-
}
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HiveLoadNode.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HiveLoadNode.java
index e5a2c6be9..71e26b140 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HiveLoadNode.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HiveLoadNode.java
@@ -18,22 +18,25 @@
package org.apache.inlong.sort.protocol.node.load;
import com.google.common.base.Preconditions;
-import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.formats.common.LocalZonedTimestampFormatInfo;
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.node.LoadNode;
import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
import org.apache.inlong.sort.protocol.transformation.FilterFunction;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
@EqualsAndHashCode(callSuper = true)
@JsonTypeName("hiveLoad")
@Data
@@ -96,6 +99,22 @@ public class HiveLoadNode extends LoadNode implements
Serializable {
this.hiveVersion = Preconditions.checkNotNull(hiveVersion, "version of
hive is null");
this.hadoopConfDir = hadoopConfDir;
this.partitionFields = partitionFields;
+ handleTimestampField();
+ }
+
+ /**
+ * Dealing with problems caused by time precision in hive
+ * Hive connector requires the time precision of timestamp and localzoned
timestamp must be 9
+ */
+ private void handleTimestampField() {
+ getFields().forEach(f -> {
+ if (f.getFormatInfo() instanceof TimestampFormatInfo) {
+ ((TimestampFormatInfo) f.getFormatInfo()).setPrecision(9);
+ }
+ if (f.getFormatInfo() instanceof LocalZonedTimestampFormatInfo) {
+ ((LocalZonedTimestampFormatInfo)
f.getFormatInfo()).setPrecision(9);
+ }
+ });
}
@Override
diff --git
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/GroupInfoTest.java
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/GroupInfoTest.java
index 0e0c20f69..ef99f1d1f 100644
---
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/GroupInfoTest.java
+++
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/GroupInfoTest.java
@@ -110,10 +110,12 @@ public class GroupInfoTest {
+
"\"formatInfo\":{\"type\":\"string\"}},{\"type\":\"base\",\"name\":\"age\","
+
"\"formatInfo\":{\"type\":\"int\"}},{\"type\":\"base\",\"name\":\"salary\","
+
"\"formatInfo\":{\"type\":\"float\"}},{\"type\":\"base\",\"name\":\"ts\","
- +
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}}],"
+ + "\"formatInfo\":{\"type\":\"timestamp\","
+ + "\"format\":\"yyyy-MM-dd
HH:mm:ss\",\"precision\":2}}],"
+
"\"watermarkField\":{\"type\":\"watermark\",\"timeAttr\":{\"type\":\"base\","
+
"\"name\":\"ts\",\"formatInfo\":{\"type\":\"timestamp\","
- + "\"format\":\"yyyy-MM-dd
HH:mm:ss\"}},\"interval\":{\"type\":\"stringConstant\","
+ + "\"format\":\"yyyy-MM-dd
HH:mm:ss\",\"precision\":2}},"
+ + "\"interval\":{\"type\":\"stringConstant\","
+
"\"value\":\"1\"},\"timeUnit\":{\"type\":\"timeUnitConstant\","
+
"\"timeUnit\":\"MINUTE\",\"value\":\"MINUTE\"}},\"primaryKey\":\"id\","
+
"\"tableNames\":[\"table\"],\"hostname\":\"localhost\",\"username\":\"username\","
@@ -124,7 +126,8 @@ public class GroupInfoTest {
+
"\"formatInfo\":{\"type\":\"string\"}},{\"type\":\"base\",\"name\":\"age\","
+
"\"formatInfo\":{\"type\":\"int\"}},{\"type\":\"base\",\"name\":\"salary\","
+
"\"formatInfo\":{\"type\":\"float\"}},{\"type\":\"base\",\"name\":\"ts\","
- +
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}}],"
+ + "\"formatInfo\":{\"type\":\"timestamp\","
+ + "\"format\":\"yyyy-MM-dd
HH:mm:ss\",\"precision\":2}}],"
+
"\"fieldRelationShips\":[{\"type\":\"fieldRelationShip\",\"inputField\":{\"type\":\"base\","
+
"\"name\":\"id\",\"formatInfo\":{\"type\":\"long\"}},\"outputField\":{\"type\":\"base\","
+
"\"name\":\"id\",\"formatInfo\":{\"type\":\"long\"}}},{\"type\":\"fieldRelationShip\","
@@ -134,9 +137,10 @@ public class GroupInfoTest {
+
"\"formatInfo\":{\"type\":\"int\"}},\"outputField\":{\"type\":\"base\",\"name\":\"age\","
+
"\"formatInfo\":{\"type\":\"int\"}}},{\"type\":\"fieldRelationShip\","
+
"\"inputField\":{\"type\":\"base\",\"name\":\"ts\",\"formatInfo\":{\"type\":\"timestamp\","
- + "\"format\":\"yyyy-MM-dd
HH:mm:ss\"}},\"outputField\":{\"type\":\"base\",\"name\":\"ts\","
- +
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}}}],"
- +
"\"topic\":\"topic\",\"bootstrapServers\":\"localhost:9092\","
+ + "\"format\":\"yyyy-MM-dd
HH:mm:ss\",\"precision\":2}},"
+ + "\"outputField\":{\"type\":\"base\",\"name\":\"ts\","
+ +
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\","
+ +
"\"precision\":2}}}],\"topic\":\"topic\",\"bootstrapServers\":\"localhost:9092\","
+
"\"format\":{\"type\":\"jsonFormat\",\"failOnMissingField\":false,\"ignoreParseErrors\":true,"
+
"\"timestampFormatStandard\":\"SQL\",\"mapNullKeyMode\":\"DROP\","
+
"\"mapNullKeyLiteral\":\"null\",\"encodeDecimalAsPlainNumber\":true},"
@@ -165,9 +169,10 @@ public class GroupInfoTest {
+
"\"formatInfo\":{\"type\":\"string\"}},{\"type\":\"base\",\"name\":\"age\","
+
"\"formatInfo\":{\"type\":\"int\"}},{\"type\":\"base\",\"name\":\"salary\","
+
"\"formatInfo\":{\"type\":\"float\"}},{\"type\":\"base\",\"name\":\"ts\","
- +
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}}],"
+ +
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd
HH:mm:ss\",\"precision\":2}}],"
+
"\"watermarkField\":{\"type\":\"watermark\",\"timeAttr\":{\"type\":\"base\","
- +
"\"name\":\"ts\",\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd
HH:mm:ss\"}},"
+ + "\"name\":\"ts\",\"formatInfo\":{\"type\":\"timestamp\","
+ + "\"format\":\"yyyy-MM-dd HH:mm:ss\",\"precision\":2}},"
+ "\"interval\":{\"type\":\"stringConstant\",\"value\":\"1\"},"
+ "\"timeUnit\":{\"type\":\"timeUnitConstant\","
+
"\"timeUnit\":\"MINUTE\",\"value\":\"MINUTE\"}},\"primaryKey\":\"id\",\"tableNames\":[\"table\"],"
@@ -178,7 +183,7 @@ public class GroupInfoTest {
+
"\"formatInfo\":{\"type\":\"string\"}},{\"type\":\"base\",\"name\":\"age\","
+
"\"formatInfo\":{\"type\":\"int\"}},{\"type\":\"base\",\"name\":\"salary\","
+
"\"formatInfo\":{\"type\":\"float\"}},{\"type\":\"base\",\"name\":\"ts\","
- +
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}}],"
+ +
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd
HH:mm:ss\",\"precision\":2}}],"
+
"\"fieldRelationShips\":[{\"type\":\"fieldRelationShip\",\"inputField\":{\"type\":\"base\","
+
"\"name\":\"id\",\"formatInfo\":{\"type\":\"long\"}},\"outputField\":{\"type\":\"base\","
+
"\"name\":\"id\",\"formatInfo\":{\"type\":\"long\"}}},{\"type\":\"fieldRelationShip\","
@@ -188,8 +193,9 @@ public class GroupInfoTest {
+
"\"formatInfo\":{\"type\":\"int\"}},\"outputField\":{\"type\":\"base\",\"name\":\"age\","
+
"\"formatInfo\":{\"type\":\"int\"}}},{\"type\":\"fieldRelationShip\","
+
"\"inputField\":{\"type\":\"base\",\"name\":\"ts\",\"formatInfo\":{\"type\":\"timestamp\","
- + "\"format\":\"yyyy-MM-dd
HH:mm:ss\"}},\"outputField\":{\"type\":\"base\",\"name\":\"ts\","
- +
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}}}],"
+ + "\"format\":\"yyyy-MM-dd HH:mm:ss\",\"precision\":2}},"
+ + "\"outputField\":{\"type\":\"base\",\"name\":\"ts\","
+ +
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd
HH:mm:ss\",\"precision\":2}}}],"
+
"\"topic\":\"topic\",\"bootstrapServers\":\"localhost:9092\",\"format\":{\"type\":\"jsonFormat\","
+
"\"failOnMissingField\":false,\"ignoreParseErrors\":true,\"timestampFormatStandard\":\"SQL\","
+
"\"mapNullKeyMode\":\"DROP\",\"mapNullKeyLiteral\":\"null\",\"encodeDecimalAsPlainNumber\":true},"
diff --git
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/StreamInfoTest.java
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/StreamInfoTest.java
index 0063b7b1e..3679dc58b 100644
---
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/StreamInfoTest.java
+++
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/StreamInfoTest.java
@@ -17,12 +17,6 @@
package org.apache.inlong.sort.protocol;
-import static org.junit.Assert.assertEquals;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.stream.Collectors;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.inlong.sort.formats.common.FloatFormatInfo;
@@ -43,6 +37,12 @@ import
org.apache.inlong.sort.protocol.transformation.WatermarkField;
import
org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
import org.junit.Test;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import static org.junit.Assert.assertEquals;
+
/**
* StreamInfo unit test class
*/
@@ -130,9 +130,9 @@ public class StreamInfoTest {
+
"base\",\"name\":\"age\",\"formatInfo\":{\"type\":\"int\"}},{\"type\":"
+
"\"base\",\"name\":\"salary\",\"formatInfo\":{\"type\":\"float\"}},"
+
"{\"type\":\"base\",\"name\":\"ts\",\"formatInfo\":{\"type\":\"timestamp\","
- + "\"format\":\"yyyy-MM-dd
HH:mm:ss\"}}],\"watermarkField\":{\"type\":\"watermark\","
+ + "\"format\":\"yyyy-MM-dd
HH:mm:ss\",\"precision\":2}}],\"watermarkField\":{\"type\":\"watermark\","
+
"\"timeAttr\":{\"type\":\"base\",\"name\":\"ts\",\"formatInfo\":{\"type\":"
- + "\"timestamp\",\"format\":\"yyyy-MM-dd
HH:mm:ss\"}},\"interval\":{\"type\":"
+ + "\"timestamp\",\"format\":\"yyyy-MM-dd
HH:mm:ss\",\"precision\":2}},\"interval\":{\"type\":"
+
"\"stringConstant\",\"value\":\"1\"},\"timeUnit\":{\"type\":\"timeUnitConstant\","
+
"\"timeUnit\":\"MINUTE\",\"value\":\"MINUTE\"}},\"primaryKey\":\"id\","
+
"\"tableNames\":[\"table\"],\"hostname\":\"localhost\",\"username\":\"username\","
@@ -143,7 +143,7 @@ public class StreamInfoTest {
+
"\"formatInfo\":{\"type\":\"string\"}},{\"type\":\"base\",\"name\":\"age\","
+
"\"formatInfo\":{\"type\":\"int\"}},{\"type\":\"base\",\"name\":\"salary\","
+
"\"formatInfo\":{\"type\":\"float\"}},{\"type\":\"base\",\"name\":\"ts\","
- +
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}}],"
+ +
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd
HH:mm:ss\",\"precision\":9}}],"
+
"\"fieldRelationShips\":[{\"type\":\"fieldRelationShip\",\"inputField\":{"
+
"\"type\":\"base\",\"name\":\"id\",\"formatInfo\":{\"type\":\"long\"}},"
+
"\"outputField\":{\"type\":\"base\",\"name\":\"id\",\"formatInfo\":{\"type\":"
@@ -154,9 +154,9 @@ public class StreamInfoTest {
+
"\"formatInfo\":{\"type\":\"int\"}},\"outputField\":{\"type\":\"base\","
+
"\"name\":\"age\",\"formatInfo\":{\"type\":\"int\"}}},{\"type\":\"fieldRelationShip\","
+
"\"inputField\":{\"type\":\"base\",\"name\":\"ts\",\"formatInfo\":{\"type\":"
- + "\"timestamp\",\"format\":\"yyyy-MM-dd
HH:mm:ss\"}},\"outputField\":{\"type\":"
+ + "\"timestamp\",\"format\":\"yyyy-MM-dd
HH:mm:ss\",\"precision\":2}},\"outputField\":{\"type\":"
+
"\"base\",\"name\":\"ts\",\"formatInfo\":{\"type\":\"timestamp\",\"format\":"
- + "\"yyyy-MM-dd
HH:mm:ss\"}}}],\"sinkParallelism\":1,\"catalogName\":\"myHive\","
+ + "\"yyyy-MM-dd
HH:mm:ss\",\"precision\":2}}}],\"sinkParallelism\":1,\"catalogName\":\"myHive\","
+
"\"database\":\"default\",\"tableName\":\"test\",\"hiveConfDir\":\"/opt/hive-conf\","
+
"\"hiveVersion\":\"3.1.2\",\"hadoopConfDir\":null,\"partitionFields\":[{\"type\":"
+
"\"base\",\"name\":\"day\",\"formatInfo\":{\"type\":\"long\"}}]}],\"relations\":"
@@ -178,9 +178,9 @@ public class StreamInfoTest {
+
"base\",\"name\":\"age\",\"formatInfo\":{\"type\":\"int\"}},{\"type\":"
+
"\"base\",\"name\":\"salary\",\"formatInfo\":{\"type\":\"float\"}},"
+
"{\"type\":\"base\",\"name\":\"ts\",\"formatInfo\":{\"type\":\"timestamp\","
- + "\"format\":\"yyyy-MM-dd
HH:mm:ss\"}}],\"watermarkField\":{\"type\":\"watermark\","
+ + "\"format\":\"yyyy-MM-dd
HH:mm:ss\",\"precision\":9}}],\"watermarkField\":{\"type\":\"watermark\","
+
"\"timeAttr\":{\"type\":\"base\",\"name\":\"ts\",\"formatInfo\":{\"type\":"
- + "\"timestamp\",\"format\":\"yyyy-MM-dd
HH:mm:ss\"}},\"interval\":{\"type\":"
+ + "\"timestamp\",\"format\":\"yyyy-MM-dd
HH:mm:ss\",\"precision\":9}},\"interval\":{\"type\":"
+
"\"stringConstant\",\"value\":\"1\"},\"timeUnit\":{\"type\":\"timeUnitConstant\","
+
"\"timeUnit\":\"MINUTE\",\"value\":\"MINUTE\"}},\"primaryKey\":\"id\","
+
"\"tableNames\":[\"table\"],\"hostname\":\"localhost\",\"username\":\"username\","
@@ -191,7 +191,7 @@ public class StreamInfoTest {
+
"\"formatInfo\":{\"type\":\"string\"}},{\"type\":\"base\",\"name\":\"age\","
+
"\"formatInfo\":{\"type\":\"int\"}},{\"type\":\"base\",\"name\":\"salary\","
+
"\"formatInfo\":{\"type\":\"float\"}},{\"type\":\"base\",\"name\":\"ts\","
- +
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}}],"
+ +
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd
HH:mm:ss\",\"precision\":9}}],"
+
"\"fieldRelationShips\":[{\"type\":\"fieldRelationShip\",\"inputField\":{"
+
"\"type\":\"base\",\"name\":\"id\",\"formatInfo\":{\"type\":\"long\"}},"
+
"\"outputField\":{\"type\":\"base\",\"name\":\"id\",\"formatInfo\":{\"type\":"
@@ -202,9 +202,9 @@ public class StreamInfoTest {
+
"\"formatInfo\":{\"type\":\"int\"}},\"outputField\":{\"type\":\"base\","
+
"\"name\":\"age\",\"formatInfo\":{\"type\":\"int\"}}},{\"type\":\"fieldRelationShip\","
+
"\"inputField\":{\"type\":\"base\",\"name\":\"ts\",\"formatInfo\":{\"type\":"
- + "\"timestamp\",\"format\":\"yyyy-MM-dd
HH:mm:ss\"}},\"outputField\":{\"type\":"
+ + "\"timestamp\",\"format\":\"yyyy-MM-dd
HH:mm:ss\",\"precision\":9}},\"outputField\":{\"type\":"
+
"\"base\",\"name\":\"ts\",\"formatInfo\":{\"type\":\"timestamp\",\"format\":"
- + "\"yyyy-MM-dd
HH:mm:ss\"}}}],\"sinkParallelism\":1,\"catalogName\":\"myHive\","
+ + "\"yyyy-MM-dd
HH:mm:ss\",\"precision\":9}}}],\"sinkParallelism\":1,\"catalogName\":\"myHive\","
+
"\"database\":\"default\",\"tableName\":\"test\",\"hiveConfDir\":\"/opt/hive-conf\","
+
"\"hiveVersion\":\"3.1.2\",\"hadoopConfDir\":null,\"partitionFields\":[{\"type\":"
+
"\"base\",\"name\":\"day\",\"formatInfo\":{\"type\":\"long\"}}]}],\"relations\":"
@@ -231,10 +231,10 @@ public class StreamInfoTest {
+
"\"formatInfo\":{\"type\":\"string\"}},{\"type\":\"base\",\"name\":\"age\","
+
"\"formatInfo\":{\"type\":\"int\"}},{\"type\":\"base\",\"name\":\"salary\","
+
"\"formatInfo\":{\"type\":\"float\"}},{\"type\":\"base\",\"name\":\"ts\","
- +
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}}],"
+ +
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd
HH:mm:ss\",\"precision\":2}}],"
+
"\"watermarkField\":{\"type\":\"watermark\",\"timeAttr\":{\"type\":\"base\","
- +
"\"name\":\"ts\",\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd
HH:mm:ss\"}},"
- + "\"interval\":{\"type\":\"stringConstant\",\"value\":\"1\"},"
+ +
"\"name\":\"ts\",\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd
HH:mm:ss\","
+ +
"\"precision\":2}},\"interval\":{\"type\":\"stringConstant\",\"value\":\"1\"},"
+ "\"timeUnit\":{\"type\":\"timeUnitConstant\","
+
"\"timeUnit\":\"MINUTE\",\"value\":\"MINUTE\"}},\"primaryKey\":\"id\",\"tableNames\":[\"table\"],"
+
"\"hostname\":\"localhost\",\"username\":\"username\",\"password\":\"username\","
@@ -244,7 +244,7 @@ public class StreamInfoTest {
+
"\"formatInfo\":{\"type\":\"string\"}},{\"type\":\"base\",\"name\":\"age\","
+
"\"formatInfo\":{\"type\":\"int\"}},{\"type\":\"base\",\"name\":\"salary\","
+
"\"formatInfo\":{\"type\":\"float\"}},{\"type\":\"base\",\"name\":\"ts\","
- +
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}}],"
+ +
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd
HH:mm:ss\",\"precision\":2}}],"
+
"\"fieldRelationShips\":[{\"type\":\"fieldRelationShip\",\"inputField\":{\"type\":\"base\","
+
"\"name\":\"id\",\"formatInfo\":{\"type\":\"long\"}},\"outputField\":{\"type\":\"base\","
+
"\"name\":\"id\",\"formatInfo\":{\"type\":\"long\"}}},{\"type\":\"fieldRelationShip\","
@@ -254,8 +254,9 @@ public class StreamInfoTest {
+
"\"formatInfo\":{\"type\":\"int\"}},\"outputField\":{\"type\":\"base\",\"name\":\"age\","
+
"\"formatInfo\":{\"type\":\"int\"}}},{\"type\":\"fieldRelationShip\","
+
"\"inputField\":{\"type\":\"base\",\"name\":\"ts\",\"formatInfo\":{\"type\":\"timestamp\","
- + "\"format\":\"yyyy-MM-dd
HH:mm:ss\"}},\"outputField\":{\"type\":\"base\",\"name\":\"ts\","
- +
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}}}],"
+ + "\"format\":\"yyyy-MM-dd HH:mm:ss\",\"precision\":2}},"
+ + "\"outputField\":{\"type\":\"base\",\"name\":\"ts\","
+ +
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd
HH:mm:ss\",\"precision\":2}}}],"
+
"\"topic\":\"topic\",\"bootstrapServers\":\"localhost:9092\",\"format\":{\"type\":\"jsonFormat\","
+
"\"failOnMissingField\":false,\"ignoreParseErrors\":true,\"timestampFormatStandard\":\"SQL\","
+
"\"mapNullKeyMode\":\"DROP\",\"mapNullKeyLiteral\":\"null\",\"encodeDecimalAsPlainNumber\":true},"
diff --git
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/HiveLoadNodeTest.java
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HiveLoadNodeTest.java
similarity index 94%
rename from
inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/HiveLoadNodeTest.java
rename to
inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HiveLoadNodeTest.java
index a17014574..bfbe70c0f 100644
---
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/HiveLoadNodeTest.java
+++
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HiveLoadNodeTest.java
@@ -15,16 +15,18 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.protocol.node;
+package org.apache.inlong.sort.protocol.node.load;
-import java.util.Arrays;
-import java.util.HashMap;
import org.apache.inlong.sort.formats.common.LongFormatInfo;
import org.apache.inlong.sort.formats.common.StringFormatInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
-import org.apache.inlong.sort.protocol.node.load.HiveLoadNode;
+import org.apache.inlong.sort.protocol.node.Node;
+import org.apache.inlong.sort.protocol.node.NodeBaseTest;
import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import java.util.Arrays;
+import java.util.HashMap;
+
public class HiveLoadNodeTest extends NodeBaseTest {
@Override
diff --git
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/transform/DistinctNodeTest.java
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/transform/DistinctNodeTest.java
index c12566755..d36750aad 100644
---
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/transform/DistinctNodeTest.java
+++
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/transform/DistinctNodeTest.java
@@ -61,7 +61,7 @@ public class DistinctNodeTest extends NodeBaseTest {
+
"\"formatInfo\":{\"type\":\"string\"}},{\"type\":\"base\",\"name\":\"f2\","
+
"\"formatInfo\":{\"type\":\"string\"}},{\"type\":\"base\",\"name\":\"f3\","
+
"\"formatInfo\":{\"type\":\"string\"}},{\"type\":\"base\",\"name\":\"ts\","
- +
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}}],"
+ +
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd
HH:mm:ss\",\"precision\":2}}],"
+
"\"fieldRelationShips\":[{\"type\":\"fieldRelationShip\",\"inputField\":{\"type\":\"base\","
+
"\"name\":\"f1\",\"formatInfo\":{\"type\":\"string\"}},\"outputField\":{\"type\":\"base\","
+
"\"name\":\"f1\",\"formatInfo\":{\"type\":\"string\"}}},{\"type\":\"fieldRelationShip\","
diff --git
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/WatermarkFieldTest.java
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/WatermarkFieldTest.java
index 83c866815..0320505c4 100644
---
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/WatermarkFieldTest.java
+++
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/WatermarkFieldTest.java
@@ -40,7 +40,7 @@ public class WatermarkFieldTest extends FunctionBaseTest {
@Override
public String getExpectSerializeStr() {
return
"{\"type\":\"watermark\",\"timeAttr\":{\"type\":\"base\",\"name\":\"ts\","
- +
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}},"
+ +
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd
HH:mm:ss\",\"precision\":2}},"
+
"\"interval\":{\"type\":\"stringConstant\",\"value\":\"10\"},"
+
"\"timeUnit\":{\"type\":\"timeUnitConstant\",\"timeUnit\":\"HOUR\",\"value\":\"HOUR\"}}";
}
diff --git
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/HopEndFunctionTest.java
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/HopEndFunctionTest.java
index c85197eb0..fdf6422bf 100644
---
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/HopEndFunctionTest.java
+++
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/HopEndFunctionTest.java
@@ -45,7 +45,7 @@ public class HopEndFunctionTest extends FunctionBaseTest {
@Override
public String getExpectSerializeStr() {
return
"{\"type\":\"hopEnd\",\"timeAttr\":{\"type\":\"base\",\"name\":\"time_field\","
- +
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}},"
+ +
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd
HH:mm:ss\",\"precision\":2}},"
+ "\"interval\":{\"type\":\"stringConstant\",\"value\":\"1\"},"
+
"\"timeUnit\":{\"type\":\"timeUnitConstant\",\"timeUnit\":\"SECOND\",\"value\":\"SECOND\"}}";
diff --git
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/HopFunctionTest.java
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/HopFunctionTest.java
index 763056cbe..d61415838 100644
---
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/HopFunctionTest.java
+++
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/HopFunctionTest.java
@@ -45,7 +45,7 @@ public class HopFunctionTest extends FunctionBaseTest {
@Override
public String getExpectSerializeStr() {
return
"{\"type\":\"hop\",\"timeAttr\":{\"type\":\"base\",\"name\":\"time_field\","
- +
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}},"
+ +
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd
HH:mm:ss\",\"precision\":2}},"
+ "\"interval\":{\"type\":\"stringConstant\",\"value\":\"1\"},"
+
"\"timeUnit\":{\"type\":\"timeUnitConstant\",\"timeUnit\":\"SECOND\",\"value\":\"SECOND\"}}";
diff --git
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/HopStartFunctionTest.java
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/HopStartFunctionTest.java
index af6d3a61e..9e7796b4e 100644
---
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/HopStartFunctionTest.java
+++
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/HopStartFunctionTest.java
@@ -45,7 +45,7 @@ public class HopStartFunctionTest extends FunctionBaseTest {
@Override
public String getExpectSerializeStr() {
return
"{\"type\":\"hopStart\",\"timeAttr\":{\"type\":\"base\",\"name\":\"time_field\","
- +
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}},"
+ +
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd
HH:mm:ss\",\"precision\":2}},"
+ "\"interval\":{\"type\":\"stringConstant\",\"value\":\"1\"},"
+
"\"timeUnit\":{\"type\":\"timeUnitConstant\",\"timeUnit\":\"SECOND\",\"value\":\"SECOND\"}}";
diff --git
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/MultiValueFilterFunctionTest.java
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/MultiValueFilterFunctionTest.java
index d4102d1ac..86e28719d 100644
---
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/MultiValueFilterFunctionTest.java
+++
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/MultiValueFilterFunctionTest.java
@@ -47,7 +47,7 @@ public class MultiValueFilterFunctionTest extends
FunctionBaseTest {
@Override
public String getExpectSerializeStr() {
return
"{\"type\":\"multiValueFilter\",\"source\":{\"type\":\"base\",\"name\":\"field\","
- +
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}},"
+ +
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd
HH:mm:ss\",\"precision\":2}},"
+
"\"targets\":[{\"type\":\"stringConstant\",\"value\":\"1\"},{\"type\":\"stringConstant\","
+
"\"value\":\"2\"}],\"compareOperator\":{\"type\":\"in\"},\"logicOperator\":{\"type\":\"empty\"}}";
diff --git
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/SessionEndFunctionTest.java
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/SessionEndFunctionTest.java
index 45ae12abf..23d574d71 100644
---
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/SessionEndFunctionTest.java
+++
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/SessionEndFunctionTest.java
@@ -45,7 +45,7 @@ public class SessionEndFunctionTest extends FunctionBaseTest {
@Override
public String getExpectSerializeStr() {
return
"{\"type\":\"sessionEnd\",\"timeAttr\":{\"type\":\"base\",\"name\":\"time_field\","
- +
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}},"
+ +
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd
HH:mm:ss\",\"precision\":2}},"
+ "\"interval\":{\"type\":\"stringConstant\",\"value\":\"1\"},"
+
"\"timeUnit\":{\"type\":\"timeUnitConstant\",\"timeUnit\":\"SECOND\",\"value\":\"SECOND\"}}";
diff --git
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/SessionFunctionTest.java
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/SessionFunctionTest.java
index 16e61e257..217dd7278 100644
---
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/SessionFunctionTest.java
+++
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/SessionFunctionTest.java
@@ -45,7 +45,7 @@ public class SessionFunctionTest extends FunctionBaseTest {
@Override
public String getExpectSerializeStr() {
return
"{\"type\":\"session\",\"timeAttr\":{\"type\":\"base\",\"name\":\"time_field\","
- +
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}},"
+ +
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd
HH:mm:ss\",\"precision\":2}},"
+ "\"interval\":{\"type\":\"stringConstant\",\"value\":\"1\"},"
+
"\"timeUnit\":{\"type\":\"timeUnitConstant\",\"timeUnit\":\"SECOND\",\"value\":\"SECOND\"}}";
diff --git
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/SessionStartFunctionTest.java
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/SessionStartFunctionTest.java
index 7c70a066d..b623f2334 100644
---
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/SessionStartFunctionTest.java
+++
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/SessionStartFunctionTest.java
@@ -45,7 +45,7 @@ public class SessionStartFunctionTest extends
FunctionBaseTest {
@Override
public String getExpectSerializeStr() {
return
"{\"type\":\"sessionStart\",\"timeAttr\":{\"type\":\"base\",\"name\":\"time_field\","
- +
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}},"
+ +
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd
HH:mm:ss\",\"precision\":2}},"
+ "\"interval\":{\"type\":\"stringConstant\",\"value\":\"1\"},"
+
"\"timeUnit\":{\"type\":\"timeUnitConstant\",\"timeUnit\":\"SECOND\",\"value\":\"SECOND\"}}";
diff --git
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/SingleValueFilterFunctionTest.java
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/SingleValueFilterFunctionTest.java
index b7f1a45fc..a449fa0c7 100644
---
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/SingleValueFilterFunctionTest.java
+++
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/SingleValueFilterFunctionTest.java
@@ -47,7 +47,7 @@ public class SingleValueFilterFunctionTest extends
FunctionBaseTest {
public String getExpectSerializeStr() {
return
"{\"type\":\"singleValueFilter\",\"logicOperator\":{\"type\":\"empty\"},"
+
"\"source\":{\"type\":\"base\",\"name\":\"single_value_field\","
- +
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}},"
+ +
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd
HH:mm:ss\",\"precision\":2}},"
+ "\"compareOperator\":{\"type\":\"equal\"},"
+
"\"target\":{\"type\":\"stringConstant\",\"value\":\"123\"}}";
diff --git
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/TumbleEndFunctionTest.java
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/TumbleEndFunctionTest.java
index 59aed6ad6..b2ee303d6 100644
---
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/TumbleEndFunctionTest.java
+++
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/TumbleEndFunctionTest.java
@@ -45,7 +45,7 @@ public class TumbleEndFunctionTest extends FunctionBaseTest {
@Override
public String getExpectSerializeStr() {
return
"{\"type\":\"tumbleEnd\",\"timeAttr\":{\"type\":\"base\",\"name\":\"time_field\","
- +
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}},"
+ +
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd
HH:mm:ss\",\"precision\":2}},"
+ "\"interval\":{\"type\":\"stringConstant\",\"value\":\"1\"},"
+
"\"timeUnit\":{\"type\":\"timeUnitConstant\",\"timeUnit\":\"SECOND\",\"value\":\"SECOND\"}}";
diff --git
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/TumbleFunctionTest.java
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/TumbleFunctionTest.java
index acb75bdc9..a228307b5 100644
---
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/TumbleFunctionTest.java
+++
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/TumbleFunctionTest.java
@@ -45,7 +45,7 @@ public class TumbleFunctionTest extends FunctionBaseTest {
@Override
public String getExpectSerializeStr() {
return
"{\"type\":\"tumble\",\"timeAttr\":{\"type\":\"base\",\"name\":\"time_field\","
- +
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}},"
+ +
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd
HH:mm:ss\",\"precision\":2}},"
+ "\"interval\":{\"type\":\"stringConstant\",\"value\":\"1\"},"
+
"\"timeUnit\":{\"type\":\"timeUnitConstant\",\"timeUnit\":\"SECOND\",\"value\":\"SECOND\"}}";
diff --git
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/TumbleStartFunctionTest.java
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/TumbleStartFunctionTest.java
index 40cd96b74..bb518710e 100644
---
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/TumbleStartFunctionTest.java
+++
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/TumbleStartFunctionTest.java
@@ -45,7 +45,7 @@ public class TumbleStartFunctionTest extends FunctionBaseTest
{
@Override
public String getExpectSerializeStr() {
return
"{\"type\":\"tumbleStart\",\"timeAttr\":{\"type\":\"base\",\"name\":\"time_field\","
- +
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}},"
+ +
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd
HH:mm:ss\",\"precision\":2}},"
+ "\"interval\":{\"type\":\"stringConstant\",\"value\":\"1\"},"
+
"\"timeUnit\":{\"type\":\"timeUnitConstant\",\"timeUnit\":\"SECOND\",\"value\":\"SECOND\"}}";
diff --git
a/inlong-sort/sort-formats/format-base/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java
b/inlong-sort/sort-formats/format-base/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java
index f53121b77..fb053a8e5 100644
---
a/inlong-sort/sort-formats/format-base/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java
+++
b/inlong-sort/sort-formats/format-base/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java
@@ -18,14 +18,6 @@
package org.apache.inlong.sort.formats.base;
-import static
org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_DERIVE_SCHEMA;
-import static
org.apache.flink.table.factories.TableFormatFactoryBase.deriveSchema;
-import static org.apache.flink.util.Preconditions.checkState;
-import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_SCHEMA;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -96,15 +88,21 @@ import org.apache.inlong.sort.formats.common.TimeTypeInfo;
import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
import org.apache.inlong.sort.formats.common.TimestampTypeInfo;
import org.apache.inlong.sort.formats.common.TypeInfo;
+import org.apache.inlong.sort.formats.common.VarCharFormatInfo;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import static
org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_DERIVE_SCHEMA;
+import static
org.apache.flink.table.factories.TableFormatFactoryBase.deriveSchema;
+import static org.apache.flink.util.Preconditions.checkState;
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_SCHEMA;
/**
* A utility class for table formats.
*/
public class TableFormatUtils {
- // to support avro format, precision must be less than 3
- private static final int DEFAULT_PRECISION_FOR_TIMESTAMP = 2;
-
/**
* Returns the {@link DeserializationSchema} described by the given
* properties.
@@ -174,7 +172,7 @@ public class TableFormatUtils {
);
return deserializationSchemaFactory
- .createProjectedDeserializationSchema(properties,
fields);
+ .createProjectedDeserializationSchema(properties, fields);
}
/**
@@ -199,7 +197,7 @@ public class TableFormatUtils {
);
return serializationSchemaFactory
- .createProjectedSerializationSchema(properties, fields);
+ .createProjectedSerializationSchema(properties, fields);
}
/**
@@ -222,7 +220,7 @@ public class TableFormatUtils {
);
return tableFormatSerializerFactory
- .createFormatSerializer(properties);
+ .createFormatSerializer(properties);
}
/**
@@ -245,7 +243,7 @@ public class TableFormatUtils {
);
return tableFormatDeserializerFactory
- .createFormatDeserializer(properties);
+ .createFormatDeserializer(properties);
}
/**
@@ -326,7 +324,9 @@ public class TableFormatUtils {
*/
public static LogicalType deriveLogicalType(FormatInfo formatInfo) {
if (formatInfo instanceof StringFormatInfo) {
- return new VarCharType();
+ return new VarCharType(VarCharType.MAX_LENGTH);
+ } else if (formatInfo instanceof VarCharFormatInfo) {
+ return new VarCharType(((VarCharFormatInfo)
formatInfo).getLength());
} else if (formatInfo instanceof BooleanFormatInfo) {
return new BooleanType();
} else if (formatInfo instanceof ByteFormatInfo) {
@@ -348,9 +348,9 @@ public class TableFormatUtils {
} else if (formatInfo instanceof DateFormatInfo) {
return new DateType();
} else if (formatInfo instanceof TimestampFormatInfo) {
- return new TimestampType(DEFAULT_PRECISION_FOR_TIMESTAMP);
+ return new TimestampType(((TimestampFormatInfo)
formatInfo).getPrecision());
} else if (formatInfo instanceof LocalZonedTimestampFormatInfo) {
- return new LocalZonedTimestampType();
+ return new
LocalZonedTimestampType(((LocalZonedTimestampFormatInfo)
formatInfo).getPrecision());
} else if (formatInfo instanceof ArrayFormatInfo) {
FormatInfo elementFormatInfo = ((ArrayFormatInfo)
formatInfo).getElementFormatInfo();
return new ArrayType(deriveLogicalType(elementFormatInfo));
diff --git
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/FormatInfo.java
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/FormatInfo.java
index 4272fa49d..45931cf4f 100644
---
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/FormatInfo.java
+++
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/FormatInfo.java
@@ -18,17 +18,19 @@
package org.apache.inlong.sort.formats.common;
-import java.io.Serializable;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSubTypes;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
+import java.io.Serializable;
+
/**
* The format information for data types.
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes({
@JsonSubTypes.Type(name = "string", value = StringFormatInfo.class),
+ @JsonSubTypes.Type(name = "varchar", value = VarCharFormatInfo.class),
@JsonSubTypes.Type(name = "boolean", value = BooleanFormatInfo.class),
@JsonSubTypes.Type(name = "byte", value = ByteFormatInfo.class),
@JsonSubTypes.Type(name = "short", value = ShortFormatInfo.class),
diff --git
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/LocalZonedTimestampFormatInfo.java
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/LocalZonedTimestampFormatInfo.java
index 9afc8da36..952ddf94f 100644
---
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/LocalZonedTimestampFormatInfo.java
+++
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/LocalZonedTimestampFormatInfo.java
@@ -23,14 +23,24 @@ public class LocalZonedTimestampFormatInfo implements
FormatInfo {
private static final long serialVersionUID = -7501810151856898046L;
+ // to support avro format, precision must be less than 3
+ private static final int DEFAULT_PRECISION_FOR_TIMESTAMP = 2;
+
private final String format;
- public LocalZonedTimestampFormatInfo(String format) {
+ private int precision;
+
+ public LocalZonedTimestampFormatInfo(String format, int precision) {
this.format = format;
+ this.precision = precision;
}
public LocalZonedTimestampFormatInfo() {
- this("yyyy-MM-dd HH:mm:ss");
+ this("yyyy-MM-dd HH:mm:ss", DEFAULT_PRECISION_FOR_TIMESTAMP);
+ }
+
+ public LocalZonedTimestampFormatInfo(int precision) {
+ this("yyyy-MM-dd HH:mm:ss", precision);
}
public String getFormat() {
@@ -66,4 +76,11 @@ public class LocalZonedTimestampFormatInfo implements
FormatInfo {
+ '}';
}
+ public int getPrecision() {
+ return precision;
+ }
+
+ public void setPrecision(int precision) {
+ this.precision = precision;
+ }
}
diff --git
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/TimestampFormatInfo.java
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/TimestampFormatInfo.java
index d26373c83..12f47d245 100644
---
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/TimestampFormatInfo.java
+++
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/TimestampFormatInfo.java
@@ -18,18 +18,18 @@
package org.apache.inlong.sort.formats.common;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import java.sql.Timestamp;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
-import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
-import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-
import static
org.apache.inlong.sort.formats.common.Constants.DATE_AND_TIME_STANDARD_ISO_8601;
import static
org.apache.inlong.sort.formats.common.Constants.DATE_AND_TIME_STANDARD_SQL;
@@ -41,6 +41,8 @@ public class TimestampFormatInfo implements
BasicFormatInfo<Timestamp> {
private static final long serialVersionUID = 1L;
private static final String FIELD_FORMAT = "format";
+ // to support avro format, precision must be less than 3
+ private static final int DEFAULT_PRECISION_FOR_TIMESTAMP = 2;
@JsonProperty(FIELD_FORMAT)
@Nonnull
@@ -50,12 +52,15 @@ public class TimestampFormatInfo implements
BasicFormatInfo<Timestamp> {
@Nullable
private final SimpleDateFormat simpleDateFormat;
+ @JsonProperty("precision")
+ private int precision;
+
@JsonCreator
public TimestampFormatInfo(
- @JsonProperty(FIELD_FORMAT) @Nonnull String format
- ) {
+ @JsonProperty(FIELD_FORMAT) @Nonnull String format,
+ @JsonProperty("precision") int precision) {
this.format = format;
-
+ this.precision = precision;
if (!format.equals("MICROS")
&& !format.equals("MILLIS")
&& !format.equals("SECONDS")
@@ -67,8 +72,16 @@ public class TimestampFormatInfo implements
BasicFormatInfo<Timestamp> {
}
}
+ public TimestampFormatInfo(@JsonProperty(FIELD_FORMAT) @Nonnull String
format) {
+ this(format, DEFAULT_PRECISION_FOR_TIMESTAMP);
+ }
+
public TimestampFormatInfo() {
- this("yyyy-MM-dd HH:mm:ss");
+ this("yyyy-MM-dd HH:mm:ss", DEFAULT_PRECISION_FOR_TIMESTAMP);
+ }
+
+ public TimestampFormatInfo(@JsonProperty("precision") int precision) {
+ this("yyyy-MM-dd HH:mm:ss", precision);
}
@Nonnull
@@ -160,4 +173,12 @@ public class TimestampFormatInfo implements
BasicFormatInfo<Timestamp> {
public String toString() {
return "TimestampFormatInfo{" + "format='" + format + '\'' + '}';
}
+
+ public int getPrecision() {
+ return precision;
+ }
+
+ public void setPrecision(int precision) {
+ this.precision = precision;
+ }
}
diff --git
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/VarCharFormatInfo.java
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/VarCharFormatInfo.java
new file mode 100644
index 000000000..667055aa7
--- /dev/null
+++
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/VarCharFormatInfo.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.common;
+
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * The format information for varchar.
+ */
+public class VarCharFormatInfo implements BasicFormatInfo<String> {
+
+ public static final VarCharFormatInfo INSTANCE = new VarCharFormatInfo();
+ private static final long serialVersionUID = 1L;
+
+ @JsonProperty("length")
+ private int length;
+
+ @JsonCreator
+ public VarCharFormatInfo(@JsonProperty("length") int length) {
+ this.length = length;
+ }
+
+ public VarCharFormatInfo() {
+ this(1);
+ }
+
+ @Override
+ public StringTypeInfo getTypeInfo() {
+ return StringTypeInfo.INSTANCE;
+ }
+
+ @Override
+ public String serialize(String obj) {
+ return obj;
+ }
+
+ @Override
+ public String deserialize(String text) {
+ return text.trim();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ return o != null && getClass() == o.getClass();
+ }
+
+ @Override
+ public int hashCode() {
+ return getClass().hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return "VarCharFormatInfo";
+ }
+
+ public int getLength() {
+ return length;
+ }
+
+ public void setLength(int length) {
+ this.length = length;
+ }
+}
diff --git
a/inlong-sort/sort-formats/format-common/src/test/java/org/apache/inlong/sort/formats/common/VarCharFormatInfoTest.java
b/inlong-sort/sort-formats/format-common/src/test/java/org/apache/inlong/sort/formats/common/VarCharFormatInfoTest.java
new file mode 100644
index 000000000..48eaf1dd4
--- /dev/null
+++
b/inlong-sort/sort-formats/format-common/src/test/java/org/apache/inlong/sort/formats/common/VarCharFormatInfoTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.common;
+
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.Collections;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Unit tests for {@link VarCharFormatInfo}.
+ */
+public class VarCharFormatInfoTest extends FormatInfoTestBase {
+
+ @Override
+ Collection<FormatInfo> createFormatInfos() {
+ return Collections.singletonList(VarCharFormatInfo.INSTANCE);
+ }
+
+ @Test
+ public void testSerialize() {
+ assertEquals("123", VarCharFormatInfo.INSTANCE.serialize("123"));
+ }
+
+ @Test
+ public void testDeserialize() {
+ assertEquals("123", VarCharFormatInfo.INSTANCE.deserialize("123"));
+ }
+}