This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new bcfad5446 [INLONG-3826][Sort] Enhance field format to support varchar 
types and timestamp of different precisions (#3984)
bcfad5446 is described below

commit bcfad5446a07bad01b4e565bfaa82a675adc15de
Author: yunqingmoswu <[email protected]>
AuthorDate: Thu Apr 28 10:04:23 2022 +0800

    [INLONG-3826][Sort] Enhance field format to support varchar types and 
timestamp of different precisions (#3984)
---
 .../apache/inlong/sort/protocol/node/LoadNode.java | 10 +--
 .../sort/protocol/node/load/HiveLoadNode.java      | 29 ++++++--
 .../apache/inlong/sort/protocol/GroupInfoTest.java | 28 ++++---
 .../inlong/sort/protocol/StreamInfoTest.java       | 45 ++++++------
 .../protocol/node/{ => load}/HiveLoadNodeTest.java | 10 ++-
 .../protocol/node/transform/DistinctNodeTest.java  |  2 +-
 .../transformation/WatermarkFieldTest.java         |  2 +-
 .../function/HopEndFunctionTest.java               |  2 +-
 .../transformation/function/HopFunctionTest.java   |  2 +-
 .../function/HopStartFunctionTest.java             |  2 +-
 .../function/MultiValueFilterFunctionTest.java     |  2 +-
 .../function/SessionEndFunctionTest.java           |  2 +-
 .../function/SessionFunctionTest.java              |  2 +-
 .../function/SessionStartFunctionTest.java         |  2 +-
 .../function/SingleValueFilterFunctionTest.java    |  2 +-
 .../function/TumbleEndFunctionTest.java            |  2 +-
 .../function/TumbleFunctionTest.java               |  2 +-
 .../function/TumbleStartFunctionTest.java          |  2 +-
 .../inlong/sort/formats/base/TableFormatUtils.java | 36 ++++-----
 .../inlong/sort/formats/common/FormatInfo.java     |  4 +-
 .../common/LocalZonedTimestampFormatInfo.java      | 21 +++++-
 .../sort/formats/common/TimestampFormatInfo.java   | 41 ++++++++---
 .../sort/formats/common/VarCharFormatInfo.java     | 85 ++++++++++++++++++++++
 .../sort/formats/common/VarCharFormatInfoTest.java | 46 ++++++++++++
 24 files changed, 290 insertions(+), 91 deletions(-)

diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
index 94cc5202f..332700a33 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
@@ -18,10 +18,6 @@
 package org.apache.inlong.sort.protocol.node;
 
 import com.google.common.base.Preconditions;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import javax.annotation.Nullable;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
@@ -36,6 +32,11 @@ import 
org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
 import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
 import org.apache.inlong.sort.protocol.transformation.FilterFunction;
 
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
 @JsonTypeInfo(
         use = JsonTypeInfo.Id.NAME,
         include = JsonTypeInfo.As.PROPERTY,
@@ -88,5 +89,4 @@ public abstract class LoadNode implements Node {
         this.sinkParallelism = sinkParallelism;
         this.properties = properties;
     }
-
 }
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HiveLoadNode.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HiveLoadNode.java
index e5a2c6be9..71e26b140 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HiveLoadNode.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HiveLoadNode.java
@@ -18,22 +18,25 @@
 package org.apache.inlong.sort.protocol.node.load;
 
 import com.google.common.base.Preconditions;
-import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.NoArgsConstructor;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.formats.common.LocalZonedTimestampFormatInfo;
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.node.LoadNode;
 import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
 import org.apache.inlong.sort.protocol.transformation.FilterFunction;
 
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
 @EqualsAndHashCode(callSuper = true)
 @JsonTypeName("hiveLoad")
 @Data
@@ -96,6 +99,22 @@ public class HiveLoadNode extends LoadNode implements 
Serializable {
         this.hiveVersion = Preconditions.checkNotNull(hiveVersion, "version of 
hive is null");
         this.hadoopConfDir = hadoopConfDir;
         this.partitionFields = partitionFields;
+        handleTimestampField();
+    }
+
+    /**
+     * Dealing with problems caused by time precision in hive
+     * Hive connector requires the time precision of timestamp and localzoned 
timestamp  must be 9
+     */
+    private void handleTimestampField() {
+        getFields().forEach(f -> {
+            if (f.getFormatInfo() instanceof TimestampFormatInfo) {
+                ((TimestampFormatInfo) f.getFormatInfo()).setPrecision(9);
+            }
+            if (f.getFormatInfo() instanceof LocalZonedTimestampFormatInfo) {
+                ((LocalZonedTimestampFormatInfo) 
f.getFormatInfo()).setPrecision(9);
+            }
+        });
     }
 
     @Override
diff --git 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/GroupInfoTest.java
 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/GroupInfoTest.java
index 0e0c20f69..ef99f1d1f 100644
--- 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/GroupInfoTest.java
+++ 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/GroupInfoTest.java
@@ -110,10 +110,12 @@ public class GroupInfoTest {
                         + 
"\"formatInfo\":{\"type\":\"string\"}},{\"type\":\"base\",\"name\":\"age\","
                         + 
"\"formatInfo\":{\"type\":\"int\"}},{\"type\":\"base\",\"name\":\"salary\","
                         + 
"\"formatInfo\":{\"type\":\"float\"}},{\"type\":\"base\",\"name\":\"ts\","
-                        + 
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}}],"
+                        + "\"formatInfo\":{\"type\":\"timestamp\","
+                        + "\"format\":\"yyyy-MM-dd 
HH:mm:ss\",\"precision\":2}}],"
                         + 
"\"watermarkField\":{\"type\":\"watermark\",\"timeAttr\":{\"type\":\"base\","
                         + 
"\"name\":\"ts\",\"formatInfo\":{\"type\":\"timestamp\","
-                        + "\"format\":\"yyyy-MM-dd 
HH:mm:ss\"}},\"interval\":{\"type\":\"stringConstant\","
+                        + "\"format\":\"yyyy-MM-dd 
HH:mm:ss\",\"precision\":2}},"
+                        + "\"interval\":{\"type\":\"stringConstant\","
                         + 
"\"value\":\"1\"},\"timeUnit\":{\"type\":\"timeUnitConstant\","
                         + 
"\"timeUnit\":\"MINUTE\",\"value\":\"MINUTE\"}},\"primaryKey\":\"id\","
                         + 
"\"tableNames\":[\"table\"],\"hostname\":\"localhost\",\"username\":\"username\","
@@ -124,7 +126,8 @@ public class GroupInfoTest {
                         + 
"\"formatInfo\":{\"type\":\"string\"}},{\"type\":\"base\",\"name\":\"age\","
                         + 
"\"formatInfo\":{\"type\":\"int\"}},{\"type\":\"base\",\"name\":\"salary\","
                         + 
"\"formatInfo\":{\"type\":\"float\"}},{\"type\":\"base\",\"name\":\"ts\","
-                        + 
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}}],"
+                        + "\"formatInfo\":{\"type\":\"timestamp\","
+                        + "\"format\":\"yyyy-MM-dd 
HH:mm:ss\",\"precision\":2}}],"
                         + 
"\"fieldRelationShips\":[{\"type\":\"fieldRelationShip\",\"inputField\":{\"type\":\"base\","
                         + 
"\"name\":\"id\",\"formatInfo\":{\"type\":\"long\"}},\"outputField\":{\"type\":\"base\","
                         + 
"\"name\":\"id\",\"formatInfo\":{\"type\":\"long\"}}},{\"type\":\"fieldRelationShip\","
@@ -134,9 +137,10 @@ public class GroupInfoTest {
                         + 
"\"formatInfo\":{\"type\":\"int\"}},\"outputField\":{\"type\":\"base\",\"name\":\"age\","
                         + 
"\"formatInfo\":{\"type\":\"int\"}}},{\"type\":\"fieldRelationShip\","
                         + 
"\"inputField\":{\"type\":\"base\",\"name\":\"ts\",\"formatInfo\":{\"type\":\"timestamp\","
-                        + "\"format\":\"yyyy-MM-dd 
HH:mm:ss\"}},\"outputField\":{\"type\":\"base\",\"name\":\"ts\","
-                        + 
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}}}],"
-                        + 
"\"topic\":\"topic\",\"bootstrapServers\":\"localhost:9092\","
+                        + "\"format\":\"yyyy-MM-dd 
HH:mm:ss\",\"precision\":2}},"
+                        + "\"outputField\":{\"type\":\"base\",\"name\":\"ts\","
+                        + 
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\","
+                        + 
"\"precision\":2}}}],\"topic\":\"topic\",\"bootstrapServers\":\"localhost:9092\","
                         + 
"\"format\":{\"type\":\"jsonFormat\",\"failOnMissingField\":false,\"ignoreParseErrors\":true,"
                         + 
"\"timestampFormatStandard\":\"SQL\",\"mapNullKeyMode\":\"DROP\","
                         + 
"\"mapNullKeyLiteral\":\"null\",\"encodeDecimalAsPlainNumber\":true},"
@@ -165,9 +169,10 @@ public class GroupInfoTest {
                 + 
"\"formatInfo\":{\"type\":\"string\"}},{\"type\":\"base\",\"name\":\"age\","
                 + 
"\"formatInfo\":{\"type\":\"int\"}},{\"type\":\"base\",\"name\":\"salary\","
                 + 
"\"formatInfo\":{\"type\":\"float\"}},{\"type\":\"base\",\"name\":\"ts\","
-                + 
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}}],"
+                + 
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd 
HH:mm:ss\",\"precision\":2}}],"
                 + 
"\"watermarkField\":{\"type\":\"watermark\",\"timeAttr\":{\"type\":\"base\","
-                + 
"\"name\":\"ts\",\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd 
HH:mm:ss\"}},"
+                + "\"name\":\"ts\",\"formatInfo\":{\"type\":\"timestamp\","
+                + "\"format\":\"yyyy-MM-dd HH:mm:ss\",\"precision\":2}},"
                 + "\"interval\":{\"type\":\"stringConstant\",\"value\":\"1\"},"
                 + "\"timeUnit\":{\"type\":\"timeUnitConstant\","
                 + 
"\"timeUnit\":\"MINUTE\",\"value\":\"MINUTE\"}},\"primaryKey\":\"id\",\"tableNames\":[\"table\"],"
@@ -178,7 +183,7 @@ public class GroupInfoTest {
                 + 
"\"formatInfo\":{\"type\":\"string\"}},{\"type\":\"base\",\"name\":\"age\","
                 + 
"\"formatInfo\":{\"type\":\"int\"}},{\"type\":\"base\",\"name\":\"salary\","
                 + 
"\"formatInfo\":{\"type\":\"float\"}},{\"type\":\"base\",\"name\":\"ts\","
-                + 
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}}],"
+                + 
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd 
HH:mm:ss\",\"precision\":2}}],"
                 + 
"\"fieldRelationShips\":[{\"type\":\"fieldRelationShip\",\"inputField\":{\"type\":\"base\","
                 + 
"\"name\":\"id\",\"formatInfo\":{\"type\":\"long\"}},\"outputField\":{\"type\":\"base\","
                 + 
"\"name\":\"id\",\"formatInfo\":{\"type\":\"long\"}}},{\"type\":\"fieldRelationShip\","
@@ -188,8 +193,9 @@ public class GroupInfoTest {
                 + 
"\"formatInfo\":{\"type\":\"int\"}},\"outputField\":{\"type\":\"base\",\"name\":\"age\","
                 + 
"\"formatInfo\":{\"type\":\"int\"}}},{\"type\":\"fieldRelationShip\","
                 + 
"\"inputField\":{\"type\":\"base\",\"name\":\"ts\",\"formatInfo\":{\"type\":\"timestamp\","
-                + "\"format\":\"yyyy-MM-dd 
HH:mm:ss\"}},\"outputField\":{\"type\":\"base\",\"name\":\"ts\","
-                + 
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}}}],"
+                + "\"format\":\"yyyy-MM-dd HH:mm:ss\",\"precision\":2}},"
+                + "\"outputField\":{\"type\":\"base\",\"name\":\"ts\","
+                + 
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd 
HH:mm:ss\",\"precision\":2}}}],"
                 + 
"\"topic\":\"topic\",\"bootstrapServers\":\"localhost:9092\",\"format\":{\"type\":\"jsonFormat\","
                 + 
"\"failOnMissingField\":false,\"ignoreParseErrors\":true,\"timestampFormatStandard\":\"SQL\","
                 + 
"\"mapNullKeyMode\":\"DROP\",\"mapNullKeyLiteral\":\"null\",\"encodeDecimalAsPlainNumber\":true},"
diff --git 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/StreamInfoTest.java
 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/StreamInfoTest.java
index 0063b7b1e..3679dc58b 100644
--- 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/StreamInfoTest.java
+++ 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/StreamInfoTest.java
@@ -17,12 +17,6 @@
 
 package org.apache.inlong.sort.protocol;
 
-import static org.junit.Assert.assertEquals;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.stream.Collectors;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.inlong.sort.formats.common.FloatFormatInfo;
@@ -43,6 +37,12 @@ import 
org.apache.inlong.sort.protocol.transformation.WatermarkField;
 import 
org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
 import org.junit.Test;
 
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import static org.junit.Assert.assertEquals;
+
 /**
  * StreamInfo unit test class
  */
@@ -130,9 +130,9 @@ public class StreamInfoTest {
                 + 
"base\",\"name\":\"age\",\"formatInfo\":{\"type\":\"int\"}},{\"type\":"
                 + 
"\"base\",\"name\":\"salary\",\"formatInfo\":{\"type\":\"float\"}},"
                 + 
"{\"type\":\"base\",\"name\":\"ts\",\"formatInfo\":{\"type\":\"timestamp\","
-                + "\"format\":\"yyyy-MM-dd 
HH:mm:ss\"}}],\"watermarkField\":{\"type\":\"watermark\","
+                + "\"format\":\"yyyy-MM-dd 
HH:mm:ss\",\"precision\":2}}],\"watermarkField\":{\"type\":\"watermark\","
                 + 
"\"timeAttr\":{\"type\":\"base\",\"name\":\"ts\",\"formatInfo\":{\"type\":"
-                + "\"timestamp\",\"format\":\"yyyy-MM-dd 
HH:mm:ss\"}},\"interval\":{\"type\":"
+                + "\"timestamp\",\"format\":\"yyyy-MM-dd 
HH:mm:ss\",\"precision\":2}},\"interval\":{\"type\":"
                 + 
"\"stringConstant\",\"value\":\"1\"},\"timeUnit\":{\"type\":\"timeUnitConstant\","
                 + 
"\"timeUnit\":\"MINUTE\",\"value\":\"MINUTE\"}},\"primaryKey\":\"id\","
                 + 
"\"tableNames\":[\"table\"],\"hostname\":\"localhost\",\"username\":\"username\","
@@ -143,7 +143,7 @@ public class StreamInfoTest {
                 + 
"\"formatInfo\":{\"type\":\"string\"}},{\"type\":\"base\",\"name\":\"age\","
                 + 
"\"formatInfo\":{\"type\":\"int\"}},{\"type\":\"base\",\"name\":\"salary\","
                 + 
"\"formatInfo\":{\"type\":\"float\"}},{\"type\":\"base\",\"name\":\"ts\","
-                + 
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}}],"
+                + 
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd 
HH:mm:ss\",\"precision\":9}}],"
                 + 
"\"fieldRelationShips\":[{\"type\":\"fieldRelationShip\",\"inputField\":{"
                 + 
"\"type\":\"base\",\"name\":\"id\",\"formatInfo\":{\"type\":\"long\"}},"
                 + 
"\"outputField\":{\"type\":\"base\",\"name\":\"id\",\"formatInfo\":{\"type\":"
@@ -154,9 +154,9 @@ public class StreamInfoTest {
                 + 
"\"formatInfo\":{\"type\":\"int\"}},\"outputField\":{\"type\":\"base\","
                 + 
"\"name\":\"age\",\"formatInfo\":{\"type\":\"int\"}}},{\"type\":\"fieldRelationShip\","
                 + 
"\"inputField\":{\"type\":\"base\",\"name\":\"ts\",\"formatInfo\":{\"type\":"
-                + "\"timestamp\",\"format\":\"yyyy-MM-dd 
HH:mm:ss\"}},\"outputField\":{\"type\":"
+                + "\"timestamp\",\"format\":\"yyyy-MM-dd 
HH:mm:ss\",\"precision\":2}},\"outputField\":{\"type\":"
                 + 
"\"base\",\"name\":\"ts\",\"formatInfo\":{\"type\":\"timestamp\",\"format\":"
-                + "\"yyyy-MM-dd 
HH:mm:ss\"}}}],\"sinkParallelism\":1,\"catalogName\":\"myHive\","
+                + "\"yyyy-MM-dd 
HH:mm:ss\",\"precision\":2}}}],\"sinkParallelism\":1,\"catalogName\":\"myHive\","
                 + 
"\"database\":\"default\",\"tableName\":\"test\",\"hiveConfDir\":\"/opt/hive-conf\","
                 + 
"\"hiveVersion\":\"3.1.2\",\"hadoopConfDir\":null,\"partitionFields\":[{\"type\":"
                 + 
"\"base\",\"name\":\"day\",\"formatInfo\":{\"type\":\"long\"}}]}],\"relations\":"
@@ -178,9 +178,9 @@ public class StreamInfoTest {
                 + 
"base\",\"name\":\"age\",\"formatInfo\":{\"type\":\"int\"}},{\"type\":"
                 + 
"\"base\",\"name\":\"salary\",\"formatInfo\":{\"type\":\"float\"}},"
                 + 
"{\"type\":\"base\",\"name\":\"ts\",\"formatInfo\":{\"type\":\"timestamp\","
-                + "\"format\":\"yyyy-MM-dd 
HH:mm:ss\"}}],\"watermarkField\":{\"type\":\"watermark\","
+                + "\"format\":\"yyyy-MM-dd 
HH:mm:ss\",\"precision\":9}}],\"watermarkField\":{\"type\":\"watermark\","
                 + 
"\"timeAttr\":{\"type\":\"base\",\"name\":\"ts\",\"formatInfo\":{\"type\":"
-                + "\"timestamp\",\"format\":\"yyyy-MM-dd 
HH:mm:ss\"}},\"interval\":{\"type\":"
+                + "\"timestamp\",\"format\":\"yyyy-MM-dd 
HH:mm:ss\",\"precision\":9}},\"interval\":{\"type\":"
                 + 
"\"stringConstant\",\"value\":\"1\"},\"timeUnit\":{\"type\":\"timeUnitConstant\","
                 + 
"\"timeUnit\":\"MINUTE\",\"value\":\"MINUTE\"}},\"primaryKey\":\"id\","
                 + 
"\"tableNames\":[\"table\"],\"hostname\":\"localhost\",\"username\":\"username\","
@@ -191,7 +191,7 @@ public class StreamInfoTest {
                 + 
"\"formatInfo\":{\"type\":\"string\"}},{\"type\":\"base\",\"name\":\"age\","
                 + 
"\"formatInfo\":{\"type\":\"int\"}},{\"type\":\"base\",\"name\":\"salary\","
                 + 
"\"formatInfo\":{\"type\":\"float\"}},{\"type\":\"base\",\"name\":\"ts\","
-                + 
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}}],"
+                + 
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd 
HH:mm:ss\",\"precision\":9}}],"
                 + 
"\"fieldRelationShips\":[{\"type\":\"fieldRelationShip\",\"inputField\":{"
                 + 
"\"type\":\"base\",\"name\":\"id\",\"formatInfo\":{\"type\":\"long\"}},"
                 + 
"\"outputField\":{\"type\":\"base\",\"name\":\"id\",\"formatInfo\":{\"type\":"
@@ -202,9 +202,9 @@ public class StreamInfoTest {
                 + 
"\"formatInfo\":{\"type\":\"int\"}},\"outputField\":{\"type\":\"base\","
                 + 
"\"name\":\"age\",\"formatInfo\":{\"type\":\"int\"}}},{\"type\":\"fieldRelationShip\","
                 + 
"\"inputField\":{\"type\":\"base\",\"name\":\"ts\",\"formatInfo\":{\"type\":"
-                + "\"timestamp\",\"format\":\"yyyy-MM-dd 
HH:mm:ss\"}},\"outputField\":{\"type\":"
+                + "\"timestamp\",\"format\":\"yyyy-MM-dd 
HH:mm:ss\",\"precision\":9}},\"outputField\":{\"type\":"
                 + 
"\"base\",\"name\":\"ts\",\"formatInfo\":{\"type\":\"timestamp\",\"format\":"
-                + "\"yyyy-MM-dd 
HH:mm:ss\"}}}],\"sinkParallelism\":1,\"catalogName\":\"myHive\","
+                + "\"yyyy-MM-dd 
HH:mm:ss\",\"precision\":9}}}],\"sinkParallelism\":1,\"catalogName\":\"myHive\","
                 + 
"\"database\":\"default\",\"tableName\":\"test\",\"hiveConfDir\":\"/opt/hive-conf\","
                 + 
"\"hiveVersion\":\"3.1.2\",\"hadoopConfDir\":null,\"partitionFields\":[{\"type\":"
                 + 
"\"base\",\"name\":\"day\",\"formatInfo\":{\"type\":\"long\"}}]}],\"relations\":"
@@ -231,10 +231,10 @@ public class StreamInfoTest {
                 + 
"\"formatInfo\":{\"type\":\"string\"}},{\"type\":\"base\",\"name\":\"age\","
                 + 
"\"formatInfo\":{\"type\":\"int\"}},{\"type\":\"base\",\"name\":\"salary\","
                 + 
"\"formatInfo\":{\"type\":\"float\"}},{\"type\":\"base\",\"name\":\"ts\","
-                + 
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}}],"
+                + 
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd 
HH:mm:ss\",\"precision\":2}}],"
                 + 
"\"watermarkField\":{\"type\":\"watermark\",\"timeAttr\":{\"type\":\"base\","
-                + 
"\"name\":\"ts\",\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd 
HH:mm:ss\"}},"
-                + "\"interval\":{\"type\":\"stringConstant\",\"value\":\"1\"},"
+                + 
"\"name\":\"ts\",\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd 
HH:mm:ss\","
+                + 
"\"precision\":2}},\"interval\":{\"type\":\"stringConstant\",\"value\":\"1\"},"
                 + "\"timeUnit\":{\"type\":\"timeUnitConstant\","
                 + 
"\"timeUnit\":\"MINUTE\",\"value\":\"MINUTE\"}},\"primaryKey\":\"id\",\"tableNames\":[\"table\"],"
                 + 
"\"hostname\":\"localhost\",\"username\":\"username\",\"password\":\"username\","
@@ -244,7 +244,7 @@ public class StreamInfoTest {
                 + 
"\"formatInfo\":{\"type\":\"string\"}},{\"type\":\"base\",\"name\":\"age\","
                 + 
"\"formatInfo\":{\"type\":\"int\"}},{\"type\":\"base\",\"name\":\"salary\","
                 + 
"\"formatInfo\":{\"type\":\"float\"}},{\"type\":\"base\",\"name\":\"ts\","
-                + 
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}}],"
+                + 
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd 
HH:mm:ss\",\"precision\":2}}],"
                 + 
"\"fieldRelationShips\":[{\"type\":\"fieldRelationShip\",\"inputField\":{\"type\":\"base\","
                 + 
"\"name\":\"id\",\"formatInfo\":{\"type\":\"long\"}},\"outputField\":{\"type\":\"base\","
                 + 
"\"name\":\"id\",\"formatInfo\":{\"type\":\"long\"}}},{\"type\":\"fieldRelationShip\","
@@ -254,8 +254,9 @@ public class StreamInfoTest {
                 + 
"\"formatInfo\":{\"type\":\"int\"}},\"outputField\":{\"type\":\"base\",\"name\":\"age\","
                 + 
"\"formatInfo\":{\"type\":\"int\"}}},{\"type\":\"fieldRelationShip\","
                 + 
"\"inputField\":{\"type\":\"base\",\"name\":\"ts\",\"formatInfo\":{\"type\":\"timestamp\","
-                + "\"format\":\"yyyy-MM-dd 
HH:mm:ss\"}},\"outputField\":{\"type\":\"base\",\"name\":\"ts\","
-                + 
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}}}],"
+                + "\"format\":\"yyyy-MM-dd HH:mm:ss\",\"precision\":2}},"
+                + "\"outputField\":{\"type\":\"base\",\"name\":\"ts\","
+                + 
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd 
HH:mm:ss\",\"precision\":2}}}],"
                 + 
"\"topic\":\"topic\",\"bootstrapServers\":\"localhost:9092\",\"format\":{\"type\":\"jsonFormat\","
                 + 
"\"failOnMissingField\":false,\"ignoreParseErrors\":true,\"timestampFormatStandard\":\"SQL\","
                 + 
"\"mapNullKeyMode\":\"DROP\",\"mapNullKeyLiteral\":\"null\",\"encodeDecimalAsPlainNumber\":true},"
diff --git 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/HiveLoadNodeTest.java
 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HiveLoadNodeTest.java
similarity index 94%
rename from 
inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/HiveLoadNodeTest.java
rename to 
inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HiveLoadNodeTest.java
index a17014574..bfbe70c0f 100644
--- 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/HiveLoadNodeTest.java
+++ 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HiveLoadNodeTest.java
@@ -15,16 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.protocol.node;
+package org.apache.inlong.sort.protocol.node.load;
 
-import java.util.Arrays;
-import java.util.HashMap;
 import org.apache.inlong.sort.formats.common.LongFormatInfo;
 import org.apache.inlong.sort.formats.common.StringFormatInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
-import org.apache.inlong.sort.protocol.node.load.HiveLoadNode;
+import org.apache.inlong.sort.protocol.node.Node;
+import org.apache.inlong.sort.protocol.node.NodeBaseTest;
 import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
 
+import java.util.Arrays;
+import java.util.HashMap;
+
 public class HiveLoadNodeTest extends NodeBaseTest {
 
     @Override
diff --git 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/transform/DistinctNodeTest.java
 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/transform/DistinctNodeTest.java
index c12566755..d36750aad 100644
--- 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/transform/DistinctNodeTest.java
+++ 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/transform/DistinctNodeTest.java
@@ -61,7 +61,7 @@ public class DistinctNodeTest extends NodeBaseTest {
                 + 
"\"formatInfo\":{\"type\":\"string\"}},{\"type\":\"base\",\"name\":\"f2\","
                 + 
"\"formatInfo\":{\"type\":\"string\"}},{\"type\":\"base\",\"name\":\"f3\","
                 + 
"\"formatInfo\":{\"type\":\"string\"}},{\"type\":\"base\",\"name\":\"ts\","
-                + 
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}}],"
+                + 
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd 
HH:mm:ss\",\"precision\":2}}],"
                 + 
"\"fieldRelationShips\":[{\"type\":\"fieldRelationShip\",\"inputField\":{\"type\":\"base\","
                 + 
"\"name\":\"f1\",\"formatInfo\":{\"type\":\"string\"}},\"outputField\":{\"type\":\"base\","
                 + 
"\"name\":\"f1\",\"formatInfo\":{\"type\":\"string\"}}},{\"type\":\"fieldRelationShip\","
diff --git 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/WatermarkFieldTest.java
 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/WatermarkFieldTest.java
index 83c866815..0320505c4 100644
--- 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/WatermarkFieldTest.java
+++ 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/WatermarkFieldTest.java
@@ -40,7 +40,7 @@ public class WatermarkFieldTest extends FunctionBaseTest {
     @Override
     public String getExpectSerializeStr() {
         return 
"{\"type\":\"watermark\",\"timeAttr\":{\"type\":\"base\",\"name\":\"ts\","
-                + 
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}},"
+                + 
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd 
HH:mm:ss\",\"precision\":2}},"
                 + 
"\"interval\":{\"type\":\"stringConstant\",\"value\":\"10\"},"
                 + 
"\"timeUnit\":{\"type\":\"timeUnitConstant\",\"timeUnit\":\"HOUR\",\"value\":\"HOUR\"}}";
     }
diff --git 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/HopEndFunctionTest.java
 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/HopEndFunctionTest.java
index c85197eb0..fdf6422bf 100644
--- 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/HopEndFunctionTest.java
+++ 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/HopEndFunctionTest.java
@@ -45,7 +45,7 @@ public class HopEndFunctionTest extends FunctionBaseTest {
     @Override
     public String getExpectSerializeStr() {
         return 
"{\"type\":\"hopEnd\",\"timeAttr\":{\"type\":\"base\",\"name\":\"time_field\","
-                + 
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}},"
+                + 
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd 
HH:mm:ss\",\"precision\":2}},"
                 + "\"interval\":{\"type\":\"stringConstant\",\"value\":\"1\"},"
                 + 
"\"timeUnit\":{\"type\":\"timeUnitConstant\",\"timeUnit\":\"SECOND\",\"value\":\"SECOND\"}}";
 
diff --git 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/HopFunctionTest.java
 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/HopFunctionTest.java
index 763056cbe..d61415838 100644
--- 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/HopFunctionTest.java
+++ 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/HopFunctionTest.java
@@ -45,7 +45,7 @@ public class HopFunctionTest extends FunctionBaseTest {
     @Override
     public String getExpectSerializeStr() {
         return 
"{\"type\":\"hop\",\"timeAttr\":{\"type\":\"base\",\"name\":\"time_field\","
-                + 
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}},"
+                + 
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd 
HH:mm:ss\",\"precision\":2}},"
                 + "\"interval\":{\"type\":\"stringConstant\",\"value\":\"1\"},"
                 + 
"\"timeUnit\":{\"type\":\"timeUnitConstant\",\"timeUnit\":\"SECOND\",\"value\":\"SECOND\"}}";
 
diff --git 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/HopStartFunctionTest.java
 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/HopStartFunctionTest.java
index af6d3a61e..9e7796b4e 100644
--- 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/HopStartFunctionTest.java
+++ 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/HopStartFunctionTest.java
@@ -45,7 +45,7 @@ public class HopStartFunctionTest extends FunctionBaseTest {
     @Override
     public String getExpectSerializeStr() {
         return 
"{\"type\":\"hopStart\",\"timeAttr\":{\"type\":\"base\",\"name\":\"time_field\","
-                + 
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}},"
+                + 
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd 
HH:mm:ss\",\"precision\":2}},"
                 + "\"interval\":{\"type\":\"stringConstant\",\"value\":\"1\"},"
                 + 
"\"timeUnit\":{\"type\":\"timeUnitConstant\",\"timeUnit\":\"SECOND\",\"value\":\"SECOND\"}}";
 
diff --git 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/MultiValueFilterFunctionTest.java
 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/MultiValueFilterFunctionTest.java
index d4102d1ac..86e28719d 100644
--- 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/MultiValueFilterFunctionTest.java
+++ 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/MultiValueFilterFunctionTest.java
@@ -47,7 +47,7 @@ public class MultiValueFilterFunctionTest extends 
FunctionBaseTest {
     @Override
     public String getExpectSerializeStr() {
         return 
"{\"type\":\"multiValueFilter\",\"source\":{\"type\":\"base\",\"name\":\"field\","
-                + 
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}},"
+                + 
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd 
HH:mm:ss\",\"precision\":2}},"
                 + 
"\"targets\":[{\"type\":\"stringConstant\",\"value\":\"1\"},{\"type\":\"stringConstant\","
                 + 
"\"value\":\"2\"}],\"compareOperator\":{\"type\":\"in\"},\"logicOperator\":{\"type\":\"empty\"}}";
 
diff --git 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/SessionEndFunctionTest.java
 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/SessionEndFunctionTest.java
index 45ae12abf..23d574d71 100644
--- 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/SessionEndFunctionTest.java
+++ 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/SessionEndFunctionTest.java
@@ -45,7 +45,7 @@ public class SessionEndFunctionTest extends FunctionBaseTest {
     @Override
     public String getExpectSerializeStr() {
         return 
"{\"type\":\"sessionEnd\",\"timeAttr\":{\"type\":\"base\",\"name\":\"time_field\","
-                + 
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}},"
+                + 
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd 
HH:mm:ss\",\"precision\":2}},"
                 + "\"interval\":{\"type\":\"stringConstant\",\"value\":\"1\"},"
                 + 
"\"timeUnit\":{\"type\":\"timeUnitConstant\",\"timeUnit\":\"SECOND\",\"value\":\"SECOND\"}}";
 
diff --git 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/SessionFunctionTest.java
 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/SessionFunctionTest.java
index 16e61e257..217dd7278 100644
--- 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/SessionFunctionTest.java
+++ 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/SessionFunctionTest.java
@@ -45,7 +45,7 @@ public class SessionFunctionTest extends FunctionBaseTest {
     @Override
     public String getExpectSerializeStr() {
         return 
"{\"type\":\"session\",\"timeAttr\":{\"type\":\"base\",\"name\":\"time_field\","
-                + 
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}},"
+                + 
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd 
HH:mm:ss\",\"precision\":2}},"
                 + "\"interval\":{\"type\":\"stringConstant\",\"value\":\"1\"},"
                 + 
"\"timeUnit\":{\"type\":\"timeUnitConstant\",\"timeUnit\":\"SECOND\",\"value\":\"SECOND\"}}";
 
diff --git 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/SessionStartFunctionTest.java
 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/SessionStartFunctionTest.java
index 7c70a066d..b623f2334 100644
--- 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/SessionStartFunctionTest.java
+++ 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/SessionStartFunctionTest.java
@@ -45,7 +45,7 @@ public class SessionStartFunctionTest extends 
FunctionBaseTest {
     @Override
     public String getExpectSerializeStr() {
         return 
"{\"type\":\"sessionStart\",\"timeAttr\":{\"type\":\"base\",\"name\":\"time_field\","
-                + 
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}},"
+                + 
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd 
HH:mm:ss\",\"precision\":2}},"
                 + "\"interval\":{\"type\":\"stringConstant\",\"value\":\"1\"},"
                 + 
"\"timeUnit\":{\"type\":\"timeUnitConstant\",\"timeUnit\":\"SECOND\",\"value\":\"SECOND\"}}";
 
diff --git 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/SingleValueFilterFunctionTest.java
 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/SingleValueFilterFunctionTest.java
index b7f1a45fc..a449fa0c7 100644
--- 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/SingleValueFilterFunctionTest.java
+++ 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/SingleValueFilterFunctionTest.java
@@ -47,7 +47,7 @@ public class SingleValueFilterFunctionTest extends 
FunctionBaseTest {
     public String getExpectSerializeStr() {
         return 
"{\"type\":\"singleValueFilter\",\"logicOperator\":{\"type\":\"empty\"},"
                 + 
"\"source\":{\"type\":\"base\",\"name\":\"single_value_field\","
-                + 
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}},"
+                + 
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd 
HH:mm:ss\",\"precision\":2}},"
                 + "\"compareOperator\":{\"type\":\"equal\"},"
                 + 
"\"target\":{\"type\":\"stringConstant\",\"value\":\"123\"}}";
 
diff --git 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/TumbleEndFunctionTest.java
 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/TumbleEndFunctionTest.java
index 59aed6ad6..b2ee303d6 100644
--- 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/TumbleEndFunctionTest.java
+++ 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/TumbleEndFunctionTest.java
@@ -45,7 +45,7 @@ public class TumbleEndFunctionTest extends FunctionBaseTest {
     @Override
     public String getExpectSerializeStr() {
         return 
"{\"type\":\"tumbleEnd\",\"timeAttr\":{\"type\":\"base\",\"name\":\"time_field\","
-                + 
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}},"
+                + 
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd 
HH:mm:ss\",\"precision\":2}},"
                 + "\"interval\":{\"type\":\"stringConstant\",\"value\":\"1\"},"
                 + 
"\"timeUnit\":{\"type\":\"timeUnitConstant\",\"timeUnit\":\"SECOND\",\"value\":\"SECOND\"}}";
 
diff --git 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/TumbleFunctionTest.java
 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/TumbleFunctionTest.java
index acb75bdc9..a228307b5 100644
--- 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/TumbleFunctionTest.java
+++ 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/TumbleFunctionTest.java
@@ -45,7 +45,7 @@ public class TumbleFunctionTest extends FunctionBaseTest {
     @Override
     public String getExpectSerializeStr() {
         return 
"{\"type\":\"tumble\",\"timeAttr\":{\"type\":\"base\",\"name\":\"time_field\","
-                + 
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}},"
+                + 
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd 
HH:mm:ss\",\"precision\":2}},"
                 + "\"interval\":{\"type\":\"stringConstant\",\"value\":\"1\"},"
                 + 
"\"timeUnit\":{\"type\":\"timeUnitConstant\",\"timeUnit\":\"SECOND\",\"value\":\"SECOND\"}}";
 
diff --git 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/TumbleStartFunctionTest.java
 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/TumbleStartFunctionTest.java
index 40cd96b74..bb518710e 100644
--- 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/TumbleStartFunctionTest.java
+++ 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/TumbleStartFunctionTest.java
@@ -45,7 +45,7 @@ public class TumbleStartFunctionTest extends FunctionBaseTest 
{
     @Override
     public String getExpectSerializeStr() {
         return 
"{\"type\":\"tumbleStart\",\"timeAttr\":{\"type\":\"base\",\"name\":\"time_field\","
-                + 
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}},"
+                + 
"\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd 
HH:mm:ss\",\"precision\":2}},"
                 + "\"interval\":{\"type\":\"stringConstant\",\"value\":\"1\"},"
                 + 
"\"timeUnit\":{\"type\":\"timeUnitConstant\",\"timeUnit\":\"SECOND\",\"value\":\"SECOND\"}}";
 
diff --git 
a/inlong-sort/sort-formats/format-base/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java
 
b/inlong-sort/sort-formats/format-base/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java
index f53121b77..fb053a8e5 100644
--- 
a/inlong-sort/sort-formats/format-base/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java
+++ 
b/inlong-sort/sort-formats/format-base/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java
@@ -18,14 +18,6 @@
 
 package org.apache.inlong.sort.formats.base;
 
-import static 
org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_DERIVE_SCHEMA;
-import static 
org.apache.flink.table.factories.TableFormatFactoryBase.deriveSchema;
-import static org.apache.flink.util.Preconditions.checkState;
-import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_SCHEMA;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -96,15 +88,21 @@ import org.apache.inlong.sort.formats.common.TimeTypeInfo;
 import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
 import org.apache.inlong.sort.formats.common.TimestampTypeInfo;
 import org.apache.inlong.sort.formats.common.TypeInfo;
+import org.apache.inlong.sort.formats.common.VarCharFormatInfo;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import static 
org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_DERIVE_SCHEMA;
+import static 
org.apache.flink.table.factories.TableFormatFactoryBase.deriveSchema;
+import static org.apache.flink.util.Preconditions.checkState;
+import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_SCHEMA;
 
 /**
  * A utility class for table formats.
  */
 public class TableFormatUtils {
 
-    // to support avro format, precision must be less than 3
-    private static final int DEFAULT_PRECISION_FOR_TIMESTAMP = 2;
-
     /**
      * Returns the {@link DeserializationSchema} described by the given
      * properties.
@@ -174,7 +172,7 @@ public class TableFormatUtils {
                 );
 
         return deserializationSchemaFactory
-                       .createProjectedDeserializationSchema(properties, 
fields);
+                .createProjectedDeserializationSchema(properties, fields);
     }
 
     /**
@@ -199,7 +197,7 @@ public class TableFormatUtils {
                 );
 
         return serializationSchemaFactory
-                       .createProjectedSerializationSchema(properties, fields);
+                .createProjectedSerializationSchema(properties, fields);
     }
 
     /**
@@ -222,7 +220,7 @@ public class TableFormatUtils {
                 );
 
         return tableFormatSerializerFactory
-                       .createFormatSerializer(properties);
+                .createFormatSerializer(properties);
     }
 
     /**
@@ -245,7 +243,7 @@ public class TableFormatUtils {
                 );
 
         return tableFormatDeserializerFactory
-                       .createFormatDeserializer(properties);
+                .createFormatDeserializer(properties);
     }
 
     /**
@@ -326,7 +324,9 @@ public class TableFormatUtils {
      */
     public static LogicalType deriveLogicalType(FormatInfo formatInfo) {
         if (formatInfo instanceof StringFormatInfo) {
-            return new VarCharType();
+            return new VarCharType(VarCharType.MAX_LENGTH);
+        } else if (formatInfo instanceof VarCharFormatInfo) {
+            return new VarCharType(((VarCharFormatInfo) 
formatInfo).getLength());
         } else if (formatInfo instanceof BooleanFormatInfo) {
             return new BooleanType();
         } else if (formatInfo instanceof ByteFormatInfo) {
@@ -348,9 +348,9 @@ public class TableFormatUtils {
         } else if (formatInfo instanceof DateFormatInfo) {
             return new DateType();
         } else if (formatInfo instanceof TimestampFormatInfo) {
-            return new TimestampType(DEFAULT_PRECISION_FOR_TIMESTAMP);
+            return new TimestampType(((TimestampFormatInfo) 
formatInfo).getPrecision());
         } else if (formatInfo instanceof LocalZonedTimestampFormatInfo) {
-            return new LocalZonedTimestampType();
+            return new 
LocalZonedTimestampType(((LocalZonedTimestampFormatInfo) 
formatInfo).getPrecision());
         } else if (formatInfo instanceof ArrayFormatInfo) {
             FormatInfo elementFormatInfo = ((ArrayFormatInfo) 
formatInfo).getElementFormatInfo();
             return new ArrayType(deriveLogicalType(elementFormatInfo));
diff --git 
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/FormatInfo.java
 
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/FormatInfo.java
index 4272fa49d..45931cf4f 100644
--- 
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/FormatInfo.java
+++ 
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/FormatInfo.java
@@ -18,17 +18,19 @@
 
 package org.apache.inlong.sort.formats.common;
 
-import java.io.Serializable;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSubTypes;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
 
+import java.io.Serializable;
+
 /**
  * The format information for data types.
  */
 @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
 @JsonSubTypes({
         @JsonSubTypes.Type(name = "string", value = StringFormatInfo.class),
+        @JsonSubTypes.Type(name = "varchar", value = VarCharFormatInfo.class),
         @JsonSubTypes.Type(name = "boolean", value = BooleanFormatInfo.class),
         @JsonSubTypes.Type(name = "byte", value = ByteFormatInfo.class),
         @JsonSubTypes.Type(name = "short", value = ShortFormatInfo.class),
diff --git 
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/LocalZonedTimestampFormatInfo.java
 
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/LocalZonedTimestampFormatInfo.java
index 9afc8da36..952ddf94f 100644
--- 
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/LocalZonedTimestampFormatInfo.java
+++ 
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/LocalZonedTimestampFormatInfo.java
@@ -23,14 +23,24 @@ public class LocalZonedTimestampFormatInfo implements 
FormatInfo {
 
     private static final long serialVersionUID = -7501810151856898046L;
 
+    // to support avro format, precision must be less than 3
+    private static final int DEFAULT_PRECISION_FOR_TIMESTAMP = 2;
+
     private final String format;
 
-    public LocalZonedTimestampFormatInfo(String format) {
+    private int precision;
+
+    public LocalZonedTimestampFormatInfo(String format, int precision) {
         this.format = format;
+        this.precision = precision;
     }
 
     public LocalZonedTimestampFormatInfo() {
-        this("yyyy-MM-dd HH:mm:ss");
+        this("yyyy-MM-dd HH:mm:ss", DEFAULT_PRECISION_FOR_TIMESTAMP);
+    }
+
+    public LocalZonedTimestampFormatInfo(int precision) {
+        this("yyyy-MM-dd HH:mm:ss", precision);
     }
 
     public String getFormat() {
@@ -66,4 +76,11 @@ public class LocalZonedTimestampFormatInfo implements 
FormatInfo {
                 + '}';
     }
 
+    public int getPrecision() {
+        return precision;
+    }
+
+    public void setPrecision(int precision) {
+        this.precision = precision;
+    }
 }
diff --git 
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/TimestampFormatInfo.java
 
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/TimestampFormatInfo.java
index d26373c83..12f47d245 100644
--- 
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/TimestampFormatInfo.java
+++ 
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/TimestampFormatInfo.java
@@ -18,18 +18,18 @@
 
 package org.apache.inlong.sort.formats.common;
 
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 import java.sql.Timestamp;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.Objects;
 import java.util.concurrent.TimeUnit;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-
 import static 
org.apache.inlong.sort.formats.common.Constants.DATE_AND_TIME_STANDARD_ISO_8601;
 import static 
org.apache.inlong.sort.formats.common.Constants.DATE_AND_TIME_STANDARD_SQL;
 
@@ -41,6 +41,8 @@ public class TimestampFormatInfo implements 
BasicFormatInfo<Timestamp> {
     private static final long serialVersionUID = 1L;
 
     private static final String FIELD_FORMAT = "format";
+    // to support avro format, precision must be less than 3
+    private static final int DEFAULT_PRECISION_FOR_TIMESTAMP = 2;
 
     @JsonProperty(FIELD_FORMAT)
     @Nonnull
@@ -50,12 +52,15 @@ public class TimestampFormatInfo implements 
BasicFormatInfo<Timestamp> {
     @Nullable
     private final SimpleDateFormat simpleDateFormat;
 
+    @JsonProperty("precision")
+    private int precision;
+
     @JsonCreator
     public TimestampFormatInfo(
-            @JsonProperty(FIELD_FORMAT) @Nonnull String format
-    ) {
+            @JsonProperty(FIELD_FORMAT) @Nonnull String format,
+            @JsonProperty("precision") int precision) {
         this.format = format;
-
+        this.precision = precision;
         if (!format.equals("MICROS")
                 && !format.equals("MILLIS")
                 && !format.equals("SECONDS")
@@ -67,8 +72,16 @@ public class TimestampFormatInfo implements 
BasicFormatInfo<Timestamp> {
         }
     }
 
+    public TimestampFormatInfo(@JsonProperty(FIELD_FORMAT) @Nonnull String 
format) {
+        this(format, DEFAULT_PRECISION_FOR_TIMESTAMP);
+    }
+
     public TimestampFormatInfo() {
-        this("yyyy-MM-dd HH:mm:ss");
+        this("yyyy-MM-dd HH:mm:ss", DEFAULT_PRECISION_FOR_TIMESTAMP);
+    }
+
+    public TimestampFormatInfo(@JsonProperty("precision") int precision) {
+        this("yyyy-MM-dd HH:mm:ss", precision);
     }
 
     @Nonnull
@@ -160,4 +173,12 @@ public class TimestampFormatInfo implements 
BasicFormatInfo<Timestamp> {
     public String toString() {
         return "TimestampFormatInfo{" + "format='" + format + '\'' + '}';
     }
+
+    public int getPrecision() {
+        return precision;
+    }
+
+    public void setPrecision(int precision) {
+        this.precision = precision;
+    }
 }
diff --git 
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/VarCharFormatInfo.java
 
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/VarCharFormatInfo.java
new file mode 100644
index 000000000..667055aa7
--- /dev/null
+++ 
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/common/VarCharFormatInfo.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.common;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * The format information for varchar.
+ */
+public class VarCharFormatInfo implements BasicFormatInfo<String> {
+
+    public static final VarCharFormatInfo INSTANCE = new VarCharFormatInfo();
+    private static final long serialVersionUID = 1L;
+
+    @JsonProperty("length")
+    private int length;
+
+    @JsonCreator
+    public VarCharFormatInfo(@JsonProperty("length") int length) {
+        this.length = length;
+    }
+
+    public VarCharFormatInfo() {
+        this(1);
+    }
+
+    @Override
+    public StringTypeInfo getTypeInfo() {
+        return StringTypeInfo.INSTANCE;
+    }
+
+    @Override
+    public String serialize(String obj) {
+        return obj;
+    }
+
+    @Override
+    public String deserialize(String text) {
+        return text.trim();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+
+        return o != null && getClass() == o.getClass();
+    }
+
+    @Override
+    public int hashCode() {
+        return getClass().hashCode();
+    }
+
+    @Override
+    public String toString() {
+        return "VarCharFormatInfo";
+    }
+
+    public int getLength() {
+        return length;
+    }
+
+    public void setLength(int length) {
+        this.length = length;
+    }
+}
diff --git 
a/inlong-sort/sort-formats/format-common/src/test/java/org/apache/inlong/sort/formats/common/VarCharFormatInfoTest.java
 
b/inlong-sort/sort-formats/format-common/src/test/java/org/apache/inlong/sort/formats/common/VarCharFormatInfoTest.java
new file mode 100644
index 000000000..48eaf1dd4
--- /dev/null
+++ 
b/inlong-sort/sort-formats/format-common/src/test/java/org/apache/inlong/sort/formats/common/VarCharFormatInfoTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.common;
+
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.Collections;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Unit tests for {@link VarCharFormatInfo}.
+ */
+public class VarCharFormatInfoTest extends FormatInfoTestBase {
+
+    @Override
+    Collection<FormatInfo> createFormatInfos() {
+        return Collections.singletonList(VarCharFormatInfo.INSTANCE);
+    }
+
+    @Test
+    public void testSerialize() {
+        assertEquals("123", VarCharFormatInfo.INSTANCE.serialize("123"));
+    }
+
+    @Test
+    public void testDeserialize() {
+        assertEquals("123", VarCharFormatInfo.INSTANCE.deserialize("123"));
+    }
+}

Reply via email to