This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 24b2aa4412 [INLONG-8222][Manager] Support different data types mapping
for different data sources by the strategy pattern (#8223)
24b2aa4412 is described below
commit 24b2aa44126af8be5f75614349539d60d64c1408
Author: chestnufang <[email protected]>
AuthorDate: Thu Jun 15 15:19:01 2023 +0800
[INLONG-8222][Manager] Support different data types mapping for different
data sources by the strategy pattern (#8223)
Co-authored-by: chestnufang <[email protected]>
---
.../fieldtype/datasource/BaseFieldTypeMapping.java | 38 +++++
.../datasource/PostgreSQLFieldTypeMapping.java | 168 +++++++++++++++++++++
.../strategy/DefaultFieldTypeStrategy.java | 29 ++++
.../strategy/FieldTypeMappingStrategy.java | 32 ++++
.../strategy/PostgreSQLFieldTypeStrategy.java | 31 ++++
.../pojo/sort/node/base/ExtractNodeProvider.java | 18 ++-
.../pojo/sort/node/base/LoadNodeProvider.java | 17 ++-
.../sort/node/provider/PostgreSQLProvider.java | 8 +-
.../manager/pojo/sort/util/FieldInfoUtils.java | 34 ++++-
.../manager/pojo/sort/util/FieldInfoUtilsTest.java | 45 ++++++
10 files changed, 411 insertions(+), 9 deletions(-)
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/fieldtype/datasource/BaseFieldTypeMapping.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/fieldtype/datasource/BaseFieldTypeMapping.java
new file mode 100644
index 0000000000..9211914e91
--- /dev/null
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/fieldtype/datasource/BaseFieldTypeMapping.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.fieldtype.datasource;
+
+/**
+ * The interface of base field type mapping
+ */
+public interface BaseFieldTypeMapping {
+
+ /**
+ * Get the source field type
+ *
+ * @return The source field type
+ */
+ String getSourceType();
+
+ /**
+ * Get the target field type of inlong field type mapping
+ *
+ * @return The target field type
+ */
+ String getTargetType();
+}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/fieldtype/datasource/PostgreSQLFieldTypeMapping.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/fieldtype/datasource/PostgreSQLFieldTypeMapping.java
new file mode 100644
index 0000000000..9f0e46655c
--- /dev/null
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/fieldtype/datasource/PostgreSQLFieldTypeMapping.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.fieldtype.datasource;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static
org.apache.inlong.manager.common.consts.InlongConstants.LEFT_BRACKET;
+
+/**
+ * The enum of PostgreSQL field type mapping.
+ */
+public enum PostgreSQLFieldTypeMapping implements BaseFieldTypeMapping {
+
+ /**
+ * SMALLINT TYPE
+ */
+ SMALLINT("SMALLINT", "SMALLINT"),
+
+ INT2("INT2", "SMALLINT"),
+
+ SMALL_SERIAL("SMALLSERIAL", "SMALLINT"),
+
+ SERIAL2("SERIAL2", "SMALLINT"),
+
+ /**
+ * INT TYPE
+ */
+ SERIAL("SERIAL", "INT"),
+
+ INT4("INT4", "INT"),
+
+ INT("INT", "INT"),
+
+ INTEGER("INTEGER", "INT"),
+
+ INT8("INT8", "BIGINT"),
+
+ /**
+ * BIGINT TYPE
+ */
+ BIGINT("BIGINT", "BIGINT"),
+
+ BIGSERIAL("BIGSERIAL", "BIGINT"),
+
+ /**
+ * FLOAT TYPE
+ */
+ REAL("REAL", "FLOAT"),
+
+ FLOAT4("FLOAT4", "FLOAT"),
+
+ /**
+ * DOUBLE TYPE
+ */
+ FLOAT8("FLOAT8", "DOUBLE"),
+
+ DOUBLE("DOUBLE", "DOUBLE"),
+
+ DOUBLE_PRECISION("DOUBLE PRECISION", "DOUBLE"),
+
+ /**
+ * DECIMAL TYPE
+ */
+ NUMERIC("NUMERIC", "DECIMAL"),
+
+ DECIMAL("DECIMAL", "DECIMAL"),
+
+ /**
+ * BOOLEAN TYPE
+ */
+ BOOLEAN("BOOLEAN", "BOOLEAN"),
+
+ /**
+ * DATE TYPE
+ */
+ DATE("DATE", "DATE"),
+
+ /**
+ * TIME TYPE
+ */
+ TIME("TIME", "TIME"),
+
+ TIMESTAMP("TIMESTAMP", "TIMESTAMP"),
+
+ /**
+ * STRING TYPE
+ */
+ CHAR("CHAR", "STRING"),
+
+ CHARACTER("CHARACTER", "STRING"),
+
+ VARCHAR("VARCHAR", "STRING"),
+
+ CHARACTER_VARYING("CHARACTER VARYING", "STRING"),
+
+ TEXT("TEXT", "STRING"),
+
+ /**
+ * BYTES TYPE
+ */
+ BYTEA("BYTEA", "VARBINARY"),
+
+ /**
+ * ARRAY TYPE
+ */
+ ARRAY("ARRAY", "ARRAY");
+
+ /**
+ * The source data field type
+ */
+ private final String sourceType;
+
+ /**
+ * The target data field type
+ */
+ private final String targetType;
+
+ PostgreSQLFieldTypeMapping(String sourceType, String targetType) {
+ this.sourceType = sourceType;
+ this.targetType = targetType;
+ }
+
+ @Override
+ public String getSourceType() {
+ return sourceType;
+ }
+
+ @Override
+ public String getTargetType() {
+ return targetType;
+ }
+
+ private static final Map<String, String> FIELD_TYPE_MAPPING_MAP = new
HashMap<>();
+
+ static {
+ Stream.of(values()).forEach(v ->
FIELD_TYPE_MAPPING_MAP.put(v.getSourceType(), v.getTargetType()));
+ }
+
+ /**
+ * Get the field type of inlong field type mapping by the source field
type.
+ *
+ * @param sourceType the source field type
+ * @return the target field type of inlong field type mapping
+ */
+ public static String getFieldTypeMapping(String sourceType) {
+ String dataType = StringUtils.substringBefore(sourceType,
LEFT_BRACKET).toUpperCase();
+ return FIELD_TYPE_MAPPING_MAP.getOrDefault(dataType,
sourceType.toUpperCase());
+ }
+}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/fieldtype/strategy/DefaultFieldTypeStrategy.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/fieldtype/strategy/DefaultFieldTypeStrategy.java
new file mode 100644
index 0000000000..21716952d6
--- /dev/null
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/fieldtype/strategy/DefaultFieldTypeStrategy.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.fieldtype.strategy;
+
+/**
+ * The default field type mapping strategy
+ */
+public class DefaultFieldTypeStrategy implements FieldTypeMappingStrategy {
+
+ @Override
+ public String getFieldTypeMapping(String sourceType) {
+ return sourceType;
+ }
+}
\ No newline at end of file
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/fieldtype/strategy/FieldTypeMappingStrategy.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/fieldtype/strategy/FieldTypeMappingStrategy.java
new file mode 100644
index 0000000000..b456170bb6
--- /dev/null
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/fieldtype/strategy/FieldTypeMappingStrategy.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.fieldtype.strategy;
+
+/**
+ * The interface of base field type mapping strategy operation.
+ */
+public interface FieldTypeMappingStrategy {
+
+ /**
+ * Get the field type of inlong field type mapping by the source field
type.
+ *
+ * @param sourceType the source field type
+ * @return the target field type of inlong field type mapping
+ */
+ String getFieldTypeMapping(String sourceType);
+}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/fieldtype/strategy/PostgreSQLFieldTypeStrategy.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/fieldtype/strategy/PostgreSQLFieldTypeStrategy.java
new file mode 100644
index 0000000000..235bdb5db5
--- /dev/null
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/fieldtype/strategy/PostgreSQLFieldTypeStrategy.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.fieldtype.strategy;
+
+import
org.apache.inlong.manager.common.fieldtype.datasource.PostgreSQLFieldTypeMapping;
+
+/**
+ * The postgresql field type mapping strategy
+ */
+public class PostgreSQLFieldTypeStrategy implements FieldTypeMappingStrategy {
+
+ @Override
+ public String getFieldTypeMapping(String sourceType) {
+ return PostgreSQLFieldTypeMapping.getFieldTypeMapping(sourceType);
+ }
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/ExtractNodeProvider.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/ExtractNodeProvider.java
index c5aea94d08..3642cc7a3e 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/ExtractNodeProvider.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/ExtractNodeProvider.java
@@ -18,6 +18,7 @@
package org.apache.inlong.manager.pojo.sort.node.base;
import org.apache.inlong.common.enums.DataTypeEnum;
+import
org.apache.inlong.manager.common.fieldtype.strategy.FieldTypeMappingStrategy;
import org.apache.inlong.manager.pojo.sort.util.FieldInfoUtils;
import org.apache.inlong.manager.pojo.stream.StreamField;
import org.apache.inlong.manager.pojo.stream.StreamNode;
@@ -60,8 +61,23 @@ public interface ExtractNodeProvider extends NodeProvider {
*/
default List<FieldInfo> parseStreamFieldInfos(List<StreamField>
streamFields, String nodeId) {
// Filter constant fields
+ return parseStreamFieldInfos(streamFields, nodeId, null);
+ }
+
+ /**
+ * Parse StreamFieldInfos
+ *
+ * @param streamFields The stream fields
+ * @param nodeId The node id
+ * @param fieldTypeMappingStrategy The field type mapping operation
strategy
+ * @return FieldInfo list
+ */
+ default List<FieldInfo> parseStreamFieldInfos(List<StreamField>
streamFields, String nodeId,
+ FieldTypeMappingStrategy fieldTypeMappingStrategy) {
+ // Filter constant fields
return streamFields.stream().filter(s ->
Objects.isNull(s.getFieldValue()))
- .map(streamFieldInfo ->
FieldInfoUtils.parseStreamFieldInfo(streamFieldInfo, nodeId))
+ .map(streamFieldInfo -> FieldInfoUtils
+ .parseStreamFieldInfo(streamFieldInfo, nodeId,
fieldTypeMappingStrategy))
.collect(Collectors.toList());
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/LoadNodeProvider.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/LoadNodeProvider.java
index 2dd27f3416..2df89d9710 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/LoadNodeProvider.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/LoadNodeProvider.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.pojo.sort.node.base;
import org.apache.inlong.common.enums.DataTypeEnum;
import org.apache.inlong.manager.common.enums.FieldType;
+import
org.apache.inlong.manager.common.fieldtype.strategy.FieldTypeMappingStrategy;
import org.apache.inlong.manager.pojo.sink.SinkField;
import org.apache.inlong.manager.pojo.sort.util.FieldInfoUtils;
import org.apache.inlong.manager.pojo.stream.StreamField;
@@ -65,7 +66,21 @@ public interface LoadNodeProvider extends NodeProvider {
* @return FieldInfo list
*/
default List<FieldInfo> parseSinkFieldInfos(List<SinkField> sinkFields,
String nodeId) {
- return sinkFields.stream().map(field ->
FieldInfoUtils.parseSinkFieldInfo(field, nodeId))
+ return parseSinkFieldInfos(sinkFields, nodeId, null);
+ }
+
+ /**
+ * Parse FieldInfos
+ *
+ * @param sinkFields The stream sink fields
+ * @param nodeId The node id
+ * @param fieldTypeMappingStrategy The field type mapping operation
strategy
+ * @return FieldInfo list
+ */
+ default List<FieldInfo> parseSinkFieldInfos(List<SinkField> sinkFields,
String nodeId,
+ FieldTypeMappingStrategy fieldTypeMappingStrategy) {
+ return sinkFields.stream()
+ .map(field -> FieldInfoUtils.parseSinkFieldInfo(field, nodeId,
fieldTypeMappingStrategy))
.collect(Collectors.toList());
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PostgreSQLProvider.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PostgreSQLProvider.java
index 0f16379d4b..17c15e88b0 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PostgreSQLProvider.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PostgreSQLProvider.java
@@ -18,6 +18,8 @@
package org.apache.inlong.manager.pojo.sort.node.provider;
import org.apache.inlong.manager.common.consts.StreamType;
+import
org.apache.inlong.manager.common.fieldtype.strategy.FieldTypeMappingStrategy;
+import
org.apache.inlong.manager.common.fieldtype.strategy.PostgreSQLFieldTypeStrategy;
import org.apache.inlong.manager.pojo.sink.postgresql.PostgreSQLSink;
import org.apache.inlong.manager.pojo.sort.node.base.ExtractNodeProvider;
import org.apache.inlong.manager.pojo.sort.node.base.LoadNodeProvider;
@@ -39,6 +41,8 @@ import java.util.Map;
*/
public class PostgreSQLProvider implements ExtractNodeProvider,
LoadNodeProvider {
+ private static final FieldTypeMappingStrategy FIELD_TYPE_MAPPING_STRATEGY
= new PostgreSQLFieldTypeStrategy();
+
@Override
public Boolean accept(String streamType) {
return StreamType.POSTGRESQL.equals(streamType);
@@ -48,7 +52,7 @@ public class PostgreSQLProvider implements
ExtractNodeProvider, LoadNodeProvider
public ExtractNode createExtractNode(StreamNode streamNodeInfo) {
PostgreSQLSource postgreSQLSource = (PostgreSQLSource) streamNodeInfo;
List<FieldInfo> fieldInfos =
parseStreamFieldInfos(postgreSQLSource.getFieldList(),
- postgreSQLSource.getSourceName());
+ postgreSQLSource.getSourceName(), FIELD_TYPE_MAPPING_STRATEGY);
Map<String, String> properties =
parseProperties(postgreSQLSource.getProperties());
return new PostgresExtractNode(postgreSQLSource.getSourceName(),
@@ -74,7 +78,7 @@ public class PostgreSQLProvider implements
ExtractNodeProvider, LoadNodeProvider
PostgreSQLSink postgreSQLSink = (PostgreSQLSink) nodeInfo;
Map<String, String> properties =
parseProperties(postgreSQLSink.getProperties());
List<FieldInfo> fieldInfos =
parseSinkFieldInfos(postgreSQLSink.getSinkFieldList(),
- postgreSQLSink.getSinkName());
+ postgreSQLSink.getSinkName(), FIELD_TYPE_MAPPING_STRATEGY);
List<FieldRelation> fieldRelations =
parseSinkFields(postgreSQLSink.getSinkFieldList(), constantFieldMap);
return new PostgresLoadNode(
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FieldInfoUtils.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FieldInfoUtils.java
index a6ce3c96c3..5f4d7f274d 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FieldInfoUtils.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FieldInfoUtils.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.pojo.sort.util;
import org.apache.inlong.common.enums.MetaField;
import org.apache.inlong.manager.common.enums.FieldType;
+import
org.apache.inlong.manager.common.fieldtype.strategy.FieldTypeMappingStrategy;
import org.apache.inlong.manager.pojo.fieldformat.ArrayFormat;
import org.apache.inlong.manager.pojo.fieldformat.BinaryFormat;
import org.apache.inlong.manager.pojo.fieldformat.DecimalFormat;
@@ -56,6 +57,7 @@ import org.apache.commons.lang3.StringUtils;
import java.math.BigDecimal;
import java.util.List;
+import java.util.Objects;
import static
org.apache.inlong.manager.common.consts.InlongConstants.LEFT_BRACKET;
@@ -65,26 +67,48 @@ import static
org.apache.inlong.manager.common.consts.InlongConstants.LEFT_BRACK
@Slf4j
public class FieldInfoUtils {
- public static FieldInfo parseSinkFieldInfo(SinkField sinkField, String
nodeId) {
+ public static FieldInfo parseSinkFieldInfo(SinkField sinkField, String
nodeId,
+ FieldTypeMappingStrategy fieldTypeMappingStrategy) {
boolean isMetaField = sinkField.getIsMetaField() == 1;
+ String fieldType = sinkField.getFieldType();
+ if (Objects.nonNull(fieldTypeMappingStrategy)) {
+ fieldType =
fieldTypeMappingStrategy.getFieldTypeMapping(fieldType);
+ }
+
FieldInfo fieldInfo = getFieldInfo(sinkField.getFieldName(),
- sinkField.getFieldType(), isMetaField,
sinkField.getMetaFieldName(),
+ fieldType, isMetaField, sinkField.getMetaFieldName(),
sinkField.getFieldFormat());
fieldInfo.setNodeId(nodeId);
return fieldInfo;
}
- public static FieldInfo parseStreamFieldInfo(StreamField streamField,
String nodeId) {
+ public static FieldInfo parseStreamFieldInfo(StreamField streamField,
String nodeId,
+ FieldTypeMappingStrategy fieldTypeMappingStrategy) {
boolean isMetaField = streamField.getIsMetaField() == 1;
- FieldInfo fieldInfo = getFieldInfo(streamField.getFieldName(),
streamField.getFieldType(),
+ String fieldType = streamField.getFieldType();
+ if (Objects.nonNull(fieldTypeMappingStrategy)) {
+ fieldType =
fieldTypeMappingStrategy.getFieldTypeMapping(fieldType);
+ }
+
+ FieldInfo fieldInfo = getFieldInfo(streamField.getFieldName(),
fieldType,
isMetaField, streamField.getMetaFieldName(),
streamField.getFieldFormat());
fieldInfo.setNodeId(nodeId);
return fieldInfo;
}
public static FieldInfo parseStreamField(StreamField streamField) {
+ return parseStreamField(streamField, null);
+ }
+
+ public static FieldInfo parseStreamField(StreamField streamField,
+ FieldTypeMappingStrategy fieldTypeMappingStrategy) {
boolean isMetaField = streamField.getIsMetaField() == 1;
- FieldInfo fieldInfo = getFieldInfo(streamField.getFieldName(),
streamField.getFieldType(),
+ String fieldType = streamField.getFieldType();
+ if (Objects.nonNull(fieldTypeMappingStrategy)) {
+ fieldType =
fieldTypeMappingStrategy.getFieldTypeMapping(fieldType);
+ }
+
+ FieldInfo fieldInfo = getFieldInfo(streamField.getFieldName(),
fieldType,
isMetaField, streamField.getMetaFieldName(),
streamField.getFieldFormat());
fieldInfo.setNodeId(streamField.getOriginNodeName());
return fieldInfo;
diff --git
a/inlong-manager/manager-pojo/src/test/java/org/apache/inlong/manager/pojo/sort/util/FieldInfoUtilsTest.java
b/inlong-manager/manager-pojo/src/test/java/org/apache/inlong/manager/pojo/sort/util/FieldInfoUtilsTest.java
new file mode 100644
index 0000000000..e370c61f62
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/test/java/org/apache/inlong/manager/pojo/sort/util/FieldInfoUtilsTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sort.util;
+
+import
org.apache.inlong.manager.common.fieldtype.strategy.PostgreSQLFieldTypeStrategy;
+import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.inlong.sort.formats.common.IntTypeInfo;
+import org.apache.inlong.sort.formats.common.TypeInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Different data source field type conversion mapping test class.
+ */
+public class FieldInfoUtilsTest {
+
+ @Test
+ public void testPgIntTypeInfo() {
+ StreamField streamField = new StreamField();
+ streamField.setIsMetaField(0);
+ streamField.setFieldName("age");
+ streamField.setFieldType("int4");
+ FieldInfo fieldInfo = FieldInfoUtils.parseStreamFieldInfo(streamField,
+ "nodeId", new PostgreSQLFieldTypeStrategy());
+ TypeInfo typeInfo = fieldInfo.getFormatInfo().getTypeInfo();
+ Assertions.assertTrue(typeInfo instanceof IntTypeInfo);
+ }
+}