This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 24b2aa4412 [INLONG-8222][Manager] Support different data types mapping 
for different data sources by the strategy pattern (#8223)
24b2aa4412 is described below

commit 24b2aa44126af8be5f75614349539d60d64c1408
Author: chestnufang <[email protected]>
AuthorDate: Thu Jun 15 15:19:01 2023 +0800

    [INLONG-8222][Manager] Support different data types mapping for different 
data sources by the strategy pattern (#8223)
    
    Co-authored-by: chestnufang <[email protected]>
---
 .../fieldtype/datasource/BaseFieldTypeMapping.java |  38 +++++
 .../datasource/PostgreSQLFieldTypeMapping.java     | 168 +++++++++++++++++++++
 .../strategy/DefaultFieldTypeStrategy.java         |  29 ++++
 .../strategy/FieldTypeMappingStrategy.java         |  32 ++++
 .../strategy/PostgreSQLFieldTypeStrategy.java      |  31 ++++
 .../pojo/sort/node/base/ExtractNodeProvider.java   |  18 ++-
 .../pojo/sort/node/base/LoadNodeProvider.java      |  17 ++-
 .../sort/node/provider/PostgreSQLProvider.java     |   8 +-
 .../manager/pojo/sort/util/FieldInfoUtils.java     |  34 ++++-
 .../manager/pojo/sort/util/FieldInfoUtilsTest.java |  45 ++++++
 10 files changed, 411 insertions(+), 9 deletions(-)

diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/fieldtype/datasource/BaseFieldTypeMapping.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/fieldtype/datasource/BaseFieldTypeMapping.java
new file mode 100644
index 0000000000..9211914e91
--- /dev/null
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/fieldtype/datasource/BaseFieldTypeMapping.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.fieldtype.datasource;
+
+/**
+ * The interface of base field type mapping
+ */
+public interface BaseFieldTypeMapping {
+
+    /**
+     * Get the source field type
+     *
+     * @return The source field type
+     */
+    String getSourceType();
+
+    /**
+     * Get the target field type of inlong field type mapping
+     *
+     * @return The target field type
+     */
+    String getTargetType();
+}
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/fieldtype/datasource/PostgreSQLFieldTypeMapping.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/fieldtype/datasource/PostgreSQLFieldTypeMapping.java
new file mode 100644
index 0000000000..9f0e46655c
--- /dev/null
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/fieldtype/datasource/PostgreSQLFieldTypeMapping.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.fieldtype.datasource;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static 
org.apache.inlong.manager.common.consts.InlongConstants.LEFT_BRACKET;
+
+/**
+ * The enum of PostgreSQL field type mapping.
+ */
+public enum PostgreSQLFieldTypeMapping implements BaseFieldTypeMapping {
+
+    /**
+     * SMALLINT TYPE
+     */
+    SMALLINT("SMALLINT", "SMALLINT"),
+
+    INT2("INT2", "SMALLINT"),
+
+    SMALL_SERIAL("SMALLSERIAL", "SMALLINT"),
+
+    SERIAL2("SERIAL2", "SMALLINT"),
+
+    /**
+     * INT TYPE
+     */
+    SERIAL("SERIAL", "INT"),
+
+    INT4("INT4", "INT"),
+
+    INT("INT", "INT"),
+
+    INTEGER("INTEGER", "INT"),
+
+    INT8("INT8", "BIGINT"),
+
+    /**
+     * BIGINT TYPE
+     */
+    BIGINT("BIGINT", "BIGINT"),
+
+    BIGSERIAL("BIGSERIAL", "BIGINT"),
+
+    /**
+     * FLOAT TYPE
+     */
+    REAL("REAL", "FLOAT"),
+
+    FLOAT4("FLOAT4", "FLOAT"),
+
+    /**
+     * DOUBLE TYPE
+     */
+    FLOAT8("FLOAT8", "DOUBLE"),
+
+    DOUBLE("DOUBLE", "DOUBLE"),
+
+    DOUBLE_PRECISION("DOUBLE PRECISION", "DOUBLE"),
+
+    /**
+     * DECIMAL TYPE
+     */
+    NUMERIC("NUMERIC", "DECIMAL"),
+
+    DECIMAL("DECIMAL", "DECIMAL"),
+
+    /**
+     * BOOLEAN TYPE
+     */
+    BOOLEAN("BOOLEAN", "BOOLEAN"),
+
+    /**
+     * DATE TYPE
+     */
+    DATE("DATE", "DATE"),
+
+    /**
+     * TIME TYPE
+     */
+    TIME("TIME", "TIME"),
+
+    TIMESTAMP("TIMESTAMP", "TIMESTAMP"),
+
+    /**
+     * STRING TYPE
+     */
+    CHAR("CHAR", "STRING"),
+
+    CHARACTER("CHARACTER", "STRING"),
+
+    VARCHAR("VARCHAR", "STRING"),
+
+    CHARACTER_VARYING("CHARACTER VARYING", "STRING"),
+
+    TEXT("TEXT", "STRING"),
+
+    /**
+     * BYTES TYPE
+     */
+    BYTEA("BYTEA", "VARBINARY"),
+
+    /**
+     * ARRAY TYPE
+     */
+    ARRAY("ARRAY", "ARRAY");
+
+    /**
+     * The source data field type
+     */
+    private final String sourceType;
+
+    /**
+     * The target data field type
+     */
+    private final String targetType;
+
+    PostgreSQLFieldTypeMapping(String sourceType, String targetType) {
+        this.sourceType = sourceType;
+        this.targetType = targetType;
+    }
+
+    @Override
+    public String getSourceType() {
+        return sourceType;
+    }
+
+    @Override
+    public String getTargetType() {
+        return targetType;
+    }
+
+    private static final Map<String, String> FIELD_TYPE_MAPPING_MAP = new 
HashMap<>();
+
+    static {
+        Stream.of(values()).forEach(v -> 
FIELD_TYPE_MAPPING_MAP.put(v.getSourceType(), v.getTargetType()));
+    }
+
+    /**
+     * Get the field type of inlong field type mapping by the source field 
type.
+     *
+     * @param sourceType the source field type
+     * @return the target field type of inlong field type mapping
+     */
+    public static String getFieldTypeMapping(String sourceType) {
+        String dataType = StringUtils.substringBefore(sourceType, 
LEFT_BRACKET).toUpperCase();
+        return FIELD_TYPE_MAPPING_MAP.getOrDefault(dataType, 
sourceType.toUpperCase());
+    }
+}
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/fieldtype/strategy/DefaultFieldTypeStrategy.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/fieldtype/strategy/DefaultFieldTypeStrategy.java
new file mode 100644
index 0000000000..21716952d6
--- /dev/null
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/fieldtype/strategy/DefaultFieldTypeStrategy.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.fieldtype.strategy;
+
+/**
+ * The default field type mapping strategy
+ */
+public class DefaultFieldTypeStrategy implements FieldTypeMappingStrategy {
+
+    @Override
+    public String getFieldTypeMapping(String sourceType) {
+        return sourceType;
+    }
+}
\ No newline at end of file
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/fieldtype/strategy/FieldTypeMappingStrategy.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/fieldtype/strategy/FieldTypeMappingStrategy.java
new file mode 100644
index 0000000000..b456170bb6
--- /dev/null
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/fieldtype/strategy/FieldTypeMappingStrategy.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.fieldtype.strategy;
+
+/**
+ * The interface of base field type mapping strategy operation.
+ */
+public interface FieldTypeMappingStrategy {
+
+    /**
+     * Get the field type of inlong field type mapping by the source field 
type.
+     *
+     * @param sourceType the source field type
+     * @return the target field type of inlong field type mapping
+     */
+    String getFieldTypeMapping(String sourceType);
+}
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/fieldtype/strategy/PostgreSQLFieldTypeStrategy.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/fieldtype/strategy/PostgreSQLFieldTypeStrategy.java
new file mode 100644
index 0000000000..235bdb5db5
--- /dev/null
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/fieldtype/strategy/PostgreSQLFieldTypeStrategy.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.fieldtype.strategy;
+
+import 
org.apache.inlong.manager.common.fieldtype.datasource.PostgreSQLFieldTypeMapping;
+
+/**
+ * The postgresql field type mapping strategy
+ */
+public class PostgreSQLFieldTypeStrategy implements FieldTypeMappingStrategy {
+
+    @Override
+    public String getFieldTypeMapping(String sourceType) {
+        return PostgreSQLFieldTypeMapping.getFieldTypeMapping(sourceType);
+    }
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/ExtractNodeProvider.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/ExtractNodeProvider.java
index c5aea94d08..3642cc7a3e 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/ExtractNodeProvider.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/ExtractNodeProvider.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.manager.pojo.sort.node.base;
 
 import org.apache.inlong.common.enums.DataTypeEnum;
+import 
org.apache.inlong.manager.common.fieldtype.strategy.FieldTypeMappingStrategy;
 import org.apache.inlong.manager.pojo.sort.util.FieldInfoUtils;
 import org.apache.inlong.manager.pojo.stream.StreamField;
 import org.apache.inlong.manager.pojo.stream.StreamNode;
@@ -60,8 +61,23 @@ public interface ExtractNodeProvider extends NodeProvider {
      */
     default List<FieldInfo> parseStreamFieldInfos(List<StreamField> 
streamFields, String nodeId) {
         // Filter constant fields
+        return parseStreamFieldInfos(streamFields, nodeId, null);
+    }
+
+    /**
+     * Parse StreamFieldInfos
+     *
+     * @param streamFields The stream fields
+     * @param nodeId The node id
+     * @param fieldTypeMappingStrategy The field type mapping operation 
strategy
+     * @return FieldInfo list
+     */
+    default List<FieldInfo> parseStreamFieldInfos(List<StreamField> 
streamFields, String nodeId,
+            FieldTypeMappingStrategy fieldTypeMappingStrategy) {
+        // Filter constant fields
         return streamFields.stream().filter(s -> 
Objects.isNull(s.getFieldValue()))
-                .map(streamFieldInfo -> 
FieldInfoUtils.parseStreamFieldInfo(streamFieldInfo, nodeId))
+                .map(streamFieldInfo -> FieldInfoUtils
+                        .parseStreamFieldInfo(streamFieldInfo, nodeId, 
fieldTypeMappingStrategy))
                 .collect(Collectors.toList());
     }
 
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/LoadNodeProvider.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/LoadNodeProvider.java
index 2dd27f3416..2df89d9710 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/LoadNodeProvider.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/LoadNodeProvider.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.pojo.sort.node.base;
 
 import org.apache.inlong.common.enums.DataTypeEnum;
 import org.apache.inlong.manager.common.enums.FieldType;
+import 
org.apache.inlong.manager.common.fieldtype.strategy.FieldTypeMappingStrategy;
 import org.apache.inlong.manager.pojo.sink.SinkField;
 import org.apache.inlong.manager.pojo.sort.util.FieldInfoUtils;
 import org.apache.inlong.manager.pojo.stream.StreamField;
@@ -65,7 +66,21 @@ public interface LoadNodeProvider extends NodeProvider {
      * @return FieldInfo list
      */
     default List<FieldInfo> parseSinkFieldInfos(List<SinkField> sinkFields, 
String nodeId) {
-        return sinkFields.stream().map(field -> 
FieldInfoUtils.parseSinkFieldInfo(field, nodeId))
+        return parseSinkFieldInfos(sinkFields, nodeId, null);
+    }
+
+    /**
+     * Parse FieldInfos
+     *
+     * @param sinkFields The stream sink fields
+     * @param nodeId The node id
+     * @param fieldTypeMappingStrategy The field type mapping operation 
strategy
+     * @return FieldInfo list
+     */
+    default List<FieldInfo> parseSinkFieldInfos(List<SinkField> sinkFields, 
String nodeId,
+            FieldTypeMappingStrategy fieldTypeMappingStrategy) {
+        return sinkFields.stream()
+                .map(field -> FieldInfoUtils.parseSinkFieldInfo(field, nodeId, 
fieldTypeMappingStrategy))
                 .collect(Collectors.toList());
     }
 
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PostgreSQLProvider.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PostgreSQLProvider.java
index 0f16379d4b..17c15e88b0 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PostgreSQLProvider.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PostgreSQLProvider.java
@@ -18,6 +18,8 @@
 package org.apache.inlong.manager.pojo.sort.node.provider;
 
 import org.apache.inlong.manager.common.consts.StreamType;
+import 
org.apache.inlong.manager.common.fieldtype.strategy.FieldTypeMappingStrategy;
+import 
org.apache.inlong.manager.common.fieldtype.strategy.PostgreSQLFieldTypeStrategy;
 import org.apache.inlong.manager.pojo.sink.postgresql.PostgreSQLSink;
 import org.apache.inlong.manager.pojo.sort.node.base.ExtractNodeProvider;
 import org.apache.inlong.manager.pojo.sort.node.base.LoadNodeProvider;
@@ -39,6 +41,8 @@ import java.util.Map;
  */
 public class PostgreSQLProvider implements ExtractNodeProvider, 
LoadNodeProvider {
 
+    private static final FieldTypeMappingStrategy FIELD_TYPE_MAPPING_STRATEGY 
= new PostgreSQLFieldTypeStrategy();
+
     @Override
     public Boolean accept(String streamType) {
         return StreamType.POSTGRESQL.equals(streamType);
@@ -48,7 +52,7 @@ public class PostgreSQLProvider implements 
ExtractNodeProvider, LoadNodeProvider
     public ExtractNode createExtractNode(StreamNode streamNodeInfo) {
         PostgreSQLSource postgreSQLSource = (PostgreSQLSource) streamNodeInfo;
         List<FieldInfo> fieldInfos = 
parseStreamFieldInfos(postgreSQLSource.getFieldList(),
-                postgreSQLSource.getSourceName());
+                postgreSQLSource.getSourceName(), FIELD_TYPE_MAPPING_STRATEGY);
         Map<String, String> properties = 
parseProperties(postgreSQLSource.getProperties());
 
         return new PostgresExtractNode(postgreSQLSource.getSourceName(),
@@ -74,7 +78,7 @@ public class PostgreSQLProvider implements 
ExtractNodeProvider, LoadNodeProvider
         PostgreSQLSink postgreSQLSink = (PostgreSQLSink) nodeInfo;
         Map<String, String> properties = 
parseProperties(postgreSQLSink.getProperties());
         List<FieldInfo> fieldInfos = 
parseSinkFieldInfos(postgreSQLSink.getSinkFieldList(),
-                postgreSQLSink.getSinkName());
+                postgreSQLSink.getSinkName(), FIELD_TYPE_MAPPING_STRATEGY);
         List<FieldRelation> fieldRelations = 
parseSinkFields(postgreSQLSink.getSinkFieldList(), constantFieldMap);
 
         return new PostgresLoadNode(
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FieldInfoUtils.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FieldInfoUtils.java
index a6ce3c96c3..5f4d7f274d 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FieldInfoUtils.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FieldInfoUtils.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.pojo.sort.util;
 
 import org.apache.inlong.common.enums.MetaField;
 import org.apache.inlong.manager.common.enums.FieldType;
+import 
org.apache.inlong.manager.common.fieldtype.strategy.FieldTypeMappingStrategy;
 import org.apache.inlong.manager.pojo.fieldformat.ArrayFormat;
 import org.apache.inlong.manager.pojo.fieldformat.BinaryFormat;
 import org.apache.inlong.manager.pojo.fieldformat.DecimalFormat;
@@ -56,6 +57,7 @@ import org.apache.commons.lang3.StringUtils;
 
 import java.math.BigDecimal;
 import java.util.List;
+import java.util.Objects;
 
 import static 
org.apache.inlong.manager.common.consts.InlongConstants.LEFT_BRACKET;
 
@@ -65,26 +67,48 @@ import static 
org.apache.inlong.manager.common.consts.InlongConstants.LEFT_BRACK
 @Slf4j
 public class FieldInfoUtils {
 
-    public static FieldInfo parseSinkFieldInfo(SinkField sinkField, String 
nodeId) {
+    public static FieldInfo parseSinkFieldInfo(SinkField sinkField, String 
nodeId,
+            FieldTypeMappingStrategy fieldTypeMappingStrategy) {
         boolean isMetaField = sinkField.getIsMetaField() == 1;
+        String fieldType = sinkField.getFieldType();
+        if (Objects.nonNull(fieldTypeMappingStrategy)) {
+            fieldType = 
fieldTypeMappingStrategy.getFieldTypeMapping(fieldType);
+        }
+
         FieldInfo fieldInfo = getFieldInfo(sinkField.getFieldName(),
-                sinkField.getFieldType(), isMetaField, 
sinkField.getMetaFieldName(),
+                fieldType, isMetaField, sinkField.getMetaFieldName(),
                 sinkField.getFieldFormat());
         fieldInfo.setNodeId(nodeId);
         return fieldInfo;
     }
 
-    public static FieldInfo parseStreamFieldInfo(StreamField streamField, 
String nodeId) {
+    public static FieldInfo parseStreamFieldInfo(StreamField streamField, 
String nodeId,
+            FieldTypeMappingStrategy fieldTypeMappingStrategy) {
         boolean isMetaField = streamField.getIsMetaField() == 1;
-        FieldInfo fieldInfo = getFieldInfo(streamField.getFieldName(), 
streamField.getFieldType(),
+        String fieldType = streamField.getFieldType();
+        if (Objects.nonNull(fieldTypeMappingStrategy)) {
+            fieldType = 
fieldTypeMappingStrategy.getFieldTypeMapping(fieldType);
+        }
+
+        FieldInfo fieldInfo = getFieldInfo(streamField.getFieldName(), 
fieldType,
                 isMetaField, streamField.getMetaFieldName(), 
streamField.getFieldFormat());
         fieldInfo.setNodeId(nodeId);
         return fieldInfo;
     }
 
     public static FieldInfo parseStreamField(StreamField streamField) {
+        return parseStreamField(streamField, null);
+    }
+
+    public static FieldInfo parseStreamField(StreamField streamField,
+            FieldTypeMappingStrategy fieldTypeMappingStrategy) {
         boolean isMetaField = streamField.getIsMetaField() == 1;
-        FieldInfo fieldInfo = getFieldInfo(streamField.getFieldName(), 
streamField.getFieldType(),
+        String fieldType = streamField.getFieldType();
+        if (Objects.nonNull(fieldTypeMappingStrategy)) {
+            fieldType = 
fieldTypeMappingStrategy.getFieldTypeMapping(fieldType);
+        }
+
+        FieldInfo fieldInfo = getFieldInfo(streamField.getFieldName(), 
fieldType,
                 isMetaField, streamField.getMetaFieldName(), 
streamField.getFieldFormat());
         fieldInfo.setNodeId(streamField.getOriginNodeName());
         return fieldInfo;
diff --git 
a/inlong-manager/manager-pojo/src/test/java/org/apache/inlong/manager/pojo/sort/util/FieldInfoUtilsTest.java
 
b/inlong-manager/manager-pojo/src/test/java/org/apache/inlong/manager/pojo/sort/util/FieldInfoUtilsTest.java
new file mode 100644
index 0000000000..e370c61f62
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/test/java/org/apache/inlong/manager/pojo/sort/util/FieldInfoUtilsTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sort.util;
+
+import 
org.apache.inlong.manager.common.fieldtype.strategy.PostgreSQLFieldTypeStrategy;
+import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.inlong.sort.formats.common.IntTypeInfo;
+import org.apache.inlong.sort.formats.common.TypeInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Different data source field type conversion mapping test class.
+ */
+public class FieldInfoUtilsTest {
+
+    @Test
+    public void testPgIntTypeInfo() {
+        StreamField streamField = new StreamField();
+        streamField.setIsMetaField(0);
+        streamField.setFieldName("age");
+        streamField.setFieldType("int4");
+        FieldInfo fieldInfo = FieldInfoUtils.parseStreamFieldInfo(streamField,
+                "nodeId", new PostgreSQLFieldTypeStrategy());
+        TypeInfo typeInfo = fieldInfo.getFormatInfo().getTypeInfo();
+        Assertions.assertTrue(typeInfo instanceof IntTypeInfo);
+    }
+}

Reply via email to