This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 0318b853d1 [INLONG-8154][Manager] Use the factory pattern to optimize 
the way of creating ExtractNodes (#8162)
0318b853d1 is described below

commit 0318b853d11e3badd06fba75ecee7218cbf42fb0
Author: chestnufang <[email protected]>
AuthorDate: Tue Jun 6 15:35:54 2023 +0800

    [INLONG-8154][Manager] Use the factory pattern to optimize the way of 
creating ExtractNodes (#8162)
    
    Co-authored-by: chestnufang <[email protected]>
---
 .../pojo/sort/node/ExtractNodeProvider.java        | 130 +++++
 .../manager/pojo/sort/node/NodeProvider.java       |  40 ++
 .../pojo/sort/node/NodeProviderFactory.java        |  93 ++++
 .../pojo/sort/node/extract/HudiProvider.java       |  64 +++
 .../pojo/sort/node/extract/KafkaProvider.java      |  93 ++++
 .../pojo/sort/node/extract/MongoProvider.java      |  59 +++
 .../sort/node/extract/MysqlBinlogProvider.java     |  85 +++
 .../pojo/sort/node/extract/OracleProvider.java     |  69 +++
 .../pojo/sort/node/extract/PostgreSqlProvider.java |  64 +++
 .../pojo/sort/node/extract/PulsarProvider.java     |  81 +++
 .../pojo/sort/node/extract/RedisProvider.java      | 130 +++++
 .../pojo/sort/node/extract/SqlServerProvider.java  |  63 +++
 .../pojo/sort/node/extract/TubeMqProvider.java     |  60 +++
 .../manager/pojo/sort/util/ExtractNodeUtils.java   | 585 ---------------------
 .../resource/sort/DefaultSortConfigOperator.java   |   4 +-
 15 files changed, 1033 insertions(+), 587 deletions(-)

diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/ExtractNodeProvider.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/ExtractNodeProvider.java
new file mode 100644
index 0000000000..aeed2df4c8
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/ExtractNodeProvider.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sort.node;
+
+import org.apache.inlong.common.enums.DataTypeEnum;
+import org.apache.inlong.manager.pojo.sort.util.FieldInfoUtils;
+import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.inlong.manager.pojo.stream.StreamNode;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.node.ExtractNode;
+import org.apache.inlong.sort.protocol.node.format.AvroFormat;
+import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat;
+import org.apache.inlong.sort.protocol.node.format.CsvFormat;
+import org.apache.inlong.sort.protocol.node.format.DebeziumJsonFormat;
+import org.apache.inlong.sort.protocol.node.format.Format;
+import org.apache.inlong.sort.protocol.node.format.InLongMsgFormat;
+import org.apache.inlong.sort.protocol.node.format.JsonFormat;
+import org.apache.inlong.sort.protocol.node.format.RawFormat;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * Interface of the extract node provider
+ */
+public interface ExtractNodeProvider extends NodeProvider {
+
+    /**
+     * Determines whether the current instance matches the specified type.
+     *
+     * @param sourceType the specified source type
+     * @return whether the current instance matches the specified type
+     */
+    Boolean accept(String sourceType);
+
+    /**
+     * Create extract node by stream node info
+     *
+     * @param nodeInfo stream node info
+     * @return the extract node
+     */
+    ExtractNode createNode(StreamNode nodeInfo);
+
+    /**
+     * Parse FieldInfos
+     *
+     * @param streamFields The stream fields
+     * @param nodeId The node id
+     * @return FieldInfo list
+     */
+    default List<FieldInfo> parseFieldInfos(List<StreamField> streamFields, 
String nodeId) {
+        // Filter constant fields
+        return streamFields.stream().filter(s -> 
Objects.isNull(s.getFieldValue()))
+                .map(streamFieldInfo -> 
FieldInfoUtils.parseStreamFieldInfo(streamFieldInfo, nodeId))
+                .collect(Collectors.toList());
+    }
+
+    /**
+     * Parse format
+     *
+     * @param serializationType data serialization, support: csv, json, canal, 
avro, etc
+     * @param wrapWithInlongMsg whether wrap content with {@link 
InLongMsgFormat}
+     * @param separatorStr the separator of data content
+     * @param ignoreParseErrors whether ignore deserialization error data
+     * @return the format for serialized content
+     */
+    default Format parsingFormat(
+            String serializationType,
+            boolean wrapWithInlongMsg,
+            String separatorStr,
+            boolean ignoreParseErrors) {
+        Format format;
+        DataTypeEnum dataType = DataTypeEnum.forType(serializationType);
+        switch (dataType) {
+            case CSV:
+                if (StringUtils.isNumeric(separatorStr)) {
+                    char dataSeparator = (char) Integer.parseInt(separatorStr);
+                    separatorStr = Character.toString(dataSeparator);
+                }
+                CsvFormat csvFormat = new CsvFormat(separatorStr);
+                csvFormat.setIgnoreParseErrors(ignoreParseErrors);
+                format = csvFormat;
+                break;
+            case AVRO:
+                format = new AvroFormat();
+                break;
+            case JSON:
+                JsonFormat jsonFormat = new JsonFormat();
+                jsonFormat.setIgnoreParseErrors(ignoreParseErrors);
+                format = jsonFormat;
+                break;
+            case CANAL:
+                format = new CanalJsonFormat();
+                break;
+            case DEBEZIUM_JSON:
+                DebeziumJsonFormat debeziumJsonFormat = new 
DebeziumJsonFormat();
+                debeziumJsonFormat.setIgnoreParseErrors(ignoreParseErrors);
+                format = debeziumJsonFormat;
+                break;
+            case RAW:
+                format = new RawFormat();
+                break;
+            default:
+                throw new IllegalArgumentException(String.format("Unsupported 
dataType=%s", dataType));
+        }
+        if (wrapWithInlongMsg) {
+            Format innerFormat = format;
+            format = new InLongMsgFormat(innerFormat, false);
+        }
+        return format;
+    }
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/NodeProvider.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/NodeProvider.java
new file mode 100644
index 0000000000..f8f7efd030
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/NodeProvider.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sort.node;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * Interface of the node provider
+ */
+public interface NodeProvider {
+
+    /**
+     * Parse properties
+     *
+     * @param properties The properties with string key and object value
+     * @return The properties with string key and string value
+     */
+    default Map<String, String> parseProperties(Map<String, Object> 
properties) {
+        return properties.entrySet().stream()
+                .filter(v -> Objects.nonNull(v.getValue()))
+                .collect(Collectors.toMap(Map.Entry::getKey, e -> 
e.getValue().toString()));
+    }
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/NodeProviderFactory.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/NodeProviderFactory.java
new file mode 100644
index 0000000000..35c447ec7e
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/NodeProviderFactory.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sort.node;
+
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.pojo.sort.node.extract.HudiProvider;
+import org.apache.inlong.manager.pojo.sort.node.extract.KafkaProvider;
+import org.apache.inlong.manager.pojo.sort.node.extract.MongoProvider;
+import org.apache.inlong.manager.pojo.sort.node.extract.MysqlBinlogProvider;
+import org.apache.inlong.manager.pojo.sort.node.extract.OracleProvider;
+import org.apache.inlong.manager.pojo.sort.node.extract.PostgreSqlProvider;
+import org.apache.inlong.manager.pojo.sort.node.extract.PulsarProvider;
+import org.apache.inlong.manager.pojo.sort.node.extract.RedisProvider;
+import org.apache.inlong.manager.pojo.sort.node.extract.SqlServerProvider;
+import org.apache.inlong.manager.pojo.sort.node.extract.TubeMqProvider;
+import org.apache.inlong.manager.pojo.source.StreamSource;
+import org.apache.inlong.sort.protocol.node.ExtractNode;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.collections.CollectionUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * The node provider factory.
+ */
+public class NodeProviderFactory {
+
+    /**
+     * The extract node provider collection
+     */
+    private static final List<ExtractNodeProvider> EXTRACT_NODE_PROVIDER_LIST 
= new ArrayList<>();
+
+    static {
+        // The Providers Parsing SourceInfo to ExtractNode which sort needed
+        EXTRACT_NODE_PROVIDER_LIST.add(new HudiProvider());
+        EXTRACT_NODE_PROVIDER_LIST.add(new KafkaProvider());
+        EXTRACT_NODE_PROVIDER_LIST.add(new MongoProvider());
+        EXTRACT_NODE_PROVIDER_LIST.add(new OracleProvider());
+        EXTRACT_NODE_PROVIDER_LIST.add(new PulsarProvider());
+        EXTRACT_NODE_PROVIDER_LIST.add(new RedisProvider());
+        EXTRACT_NODE_PROVIDER_LIST.add(new TubeMqProvider());
+        EXTRACT_NODE_PROVIDER_LIST.add(new SqlServerProvider());
+        EXTRACT_NODE_PROVIDER_LIST.add(new PostgreSqlProvider());
+        EXTRACT_NODE_PROVIDER_LIST.add(new MysqlBinlogProvider());
+
+    }
+
+    /**
+     * Get extract node provider
+     *
+     * @param sourceType the specified source type
+     * @return the extract node provider
+     */
+    public static ExtractNodeProvider getExtractNodeProvider(String 
sourceType) {
+        return EXTRACT_NODE_PROVIDER_LIST.stream()
+                .filter(inst -> inst.accept(sourceType))
+                .findFirst()
+                .orElseThrow(() -> new 
BusinessException(ErrorCodeEnum.SOURCE_TYPE_NOT_SUPPORT,
+                        
String.format(ErrorCodeEnum.SOURCE_TYPE_NOT_SUPPORT.getMessage(), sourceType)));
+    }
+
+    /**
+     * Create extract nodes from the given sources.
+     */
+    public static List<ExtractNode> createExtractNodes(List<StreamSource> 
sourceInfos) {
+        if (CollectionUtils.isEmpty(sourceInfos)) {
+            return Lists.newArrayList();
+        }
+        return sourceInfos.stream().map(v -> {
+            String sourceType = v.getSourceType();
+            return getExtractNodeProvider(sourceType).createNode(v);
+        }).collect(Collectors.toList());
+    }
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/HudiProvider.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/HudiProvider.java
new file mode 100644
index 0000000000..ebbd00e326
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/HudiProvider.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sort.node.extract;
+
+import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.pojo.sort.node.ExtractNodeProvider;
+import org.apache.inlong.manager.pojo.source.hudi.HudiSource;
+import org.apache.inlong.manager.pojo.stream.StreamNode;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.constant.HudiConstant.CatalogType;
+import org.apache.inlong.sort.protocol.node.ExtractNode;
+import org.apache.inlong.sort.protocol.node.extract.HudiExtractNode;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The Provider for creating Hudi extract nodes.
+ */
+public class HudiProvider implements ExtractNodeProvider {
+
+    @Override
+    public Boolean accept(String sourceType) {
+        return SourceType.HUDI.equals(sourceType);
+    }
+
+    @Override
+    public ExtractNode createNode(StreamNode streamNodeInfo) {
+        HudiSource source = (HudiSource) streamNodeInfo;
+        List<FieldInfo> fieldInfos = parseFieldInfos(source.getFieldList(), 
source.getSourceName());
+        Map<String, String> properties = 
parseProperties(source.getProperties());
+
+        return new HudiExtractNode(
+                source.getSourceName(),
+                source.getSourceName(),
+                fieldInfos,
+                null,
+                source.getCatalogUri(),
+                source.getWarehouse(),
+                source.getDbName(),
+                source.getTableName(),
+                CatalogType.HIVE,
+                source.getCheckIntervalInMinus(),
+                source.isReadStreamingSkipCompaction(),
+                source.getReadStartCommit(),
+                properties,
+                source.getExtList());
+    }
+}
\ No newline at end of file
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/KafkaProvider.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/KafkaProvider.java
new file mode 100644
index 0000000000..88b63f28e7
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/KafkaProvider.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sort.node.extract;
+
+import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.pojo.sort.node.ExtractNodeProvider;
+import org.apache.inlong.manager.pojo.source.kafka.KafkaOffset;
+import org.apache.inlong.manager.pojo.source.kafka.KafkaSource;
+import org.apache.inlong.manager.pojo.stream.StreamNode;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.enums.KafkaScanStartupMode;
+import org.apache.inlong.sort.protocol.node.ExtractNode;
+import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode;
+import org.apache.inlong.sort.protocol.node.format.Format;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The Provider for creating Kafka extract nodes.
+ */
+public class KafkaProvider implements ExtractNodeProvider {
+
+    @Override
+    public Boolean accept(String sourceType) {
+        return SourceType.KAFKA.equals(sourceType);
+    }
+
+    @Override
+    public ExtractNode createNode(StreamNode streamNodeInfo) {
+        KafkaSource kafkaSource = (KafkaSource) streamNodeInfo;
+        List<FieldInfo> fieldInfos = 
parseFieldInfos(kafkaSource.getFieldList(), kafkaSource.getSourceName());
+        Map<String, String> properties = 
parseProperties(kafkaSource.getProperties());
+
+        String topic = kafkaSource.getTopic();
+        String bootstrapServers = kafkaSource.getBootstrapServers();
+
+        Format format = parsingFormat(
+                kafkaSource.getSerializationType(),
+                kafkaSource.isWrapWithInlongMsg(),
+                kafkaSource.getDataSeparator(),
+                kafkaSource.isIgnoreParseErrors());
+
+        KafkaOffset kafkaOffset = 
KafkaOffset.forName(kafkaSource.getAutoOffsetReset());
+        KafkaScanStartupMode startupMode;
+        switch (kafkaOffset) {
+            case EARLIEST:
+                startupMode = KafkaScanStartupMode.EARLIEST_OFFSET;
+                break;
+            case SPECIFIC:
+                startupMode = KafkaScanStartupMode.SPECIFIC_OFFSETS;
+                break;
+            case TIMESTAMP_MILLIS:
+                startupMode = KafkaScanStartupMode.TIMESTAMP_MILLIS;
+                break;
+            case LATEST:
+            default:
+                startupMode = KafkaScanStartupMode.LATEST_OFFSET;
+        }
+        final String primaryKey = kafkaSource.getPrimaryKey();
+        String groupId = kafkaSource.getGroupId();
+        String partitionOffset = kafkaSource.getPartitionOffsets();
+        String scanTimestampMillis = kafkaSource.getTimestampMillis();
+        return new KafkaExtractNode(kafkaSource.getSourceName(),
+                kafkaSource.getSourceName(),
+                fieldInfos,
+                null,
+                properties,
+                topic,
+                bootstrapServers,
+                format,
+                startupMode,
+                primaryKey,
+                groupId,
+                partitionOffset,
+                scanTimestampMillis);
+    }
+}
\ No newline at end of file
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/MongoProvider.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/MongoProvider.java
new file mode 100644
index 0000000000..72013e2386
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/MongoProvider.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sort.node.extract;
+
+import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.pojo.sort.node.ExtractNodeProvider;
+import org.apache.inlong.manager.pojo.source.mongodb.MongoDBSource;
+import org.apache.inlong.manager.pojo.stream.StreamNode;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.node.ExtractNode;
+import org.apache.inlong.sort.protocol.node.extract.MongoExtractNode;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The Provider for creating Mongo extract nodes.
+ */
+public class MongoProvider implements ExtractNodeProvider {
+
+    @Override
+    public Boolean accept(String sourceType) {
+        return SourceType.MONGODB.equals(sourceType);
+    }
+
+    @Override
+    public ExtractNode createNode(StreamNode streamNodeInfo) {
+        MongoDBSource source = (MongoDBSource) streamNodeInfo;
+        List<FieldInfo> fieldInfos = parseFieldInfos(source.getFieldList(), 
source.getSourceName());
+        Map<String, String> properties = 
parseProperties(source.getProperties());
+
+        return new MongoExtractNode(
+                source.getSourceName(),
+                source.getSourceName(),
+                fieldInfos,
+                null,
+                properties,
+                source.getCollection(),
+                source.getHosts(),
+                source.getUsername(),
+                source.getPassword(),
+                source.getDatabase());
+    }
+}
\ No newline at end of file
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/MysqlBinlogProvider.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/MysqlBinlogProvider.java
new file mode 100644
index 0000000000..9732a8d277
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/MysqlBinlogProvider.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sort.node.extract;
+
+import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.pojo.sort.node.ExtractNodeProvider;
+import org.apache.inlong.manager.pojo.source.mysql.MySQLBinlogSource;
+import org.apache.inlong.manager.pojo.stream.StreamNode;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.node.ExtractNode;
+import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
+
+import com.google.common.base.Splitter;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The Provider for creating MysqlBinlog extract nodes.
+ */
+public class MysqlBinlogProvider implements ExtractNodeProvider {
+
+    @Override
+    public Boolean accept(String sourceType) {
+        return SourceType.MYSQL_BINLOG.equals(sourceType);
+    }
+
+    @Override
+    public ExtractNode createNode(StreamNode streamNodeInfo) {
+        MySQLBinlogSource binlogSource = (MySQLBinlogSource) streamNodeInfo;
+        List<FieldInfo> fieldInfos = 
parseFieldInfos(binlogSource.getFieldList(), binlogSource.getSourceName());
+        Map<String, String> properties = 
parseProperties(binlogSource.getProperties());
+
+        final String database = binlogSource.getDatabaseWhiteList();
+        final String primaryKey = binlogSource.getPrimaryKey();
+        final String hostName = binlogSource.getHostname();
+        final String username = binlogSource.getUser();
+        final String password = binlogSource.getPassword();
+        final Integer port = binlogSource.getPort();
+        Integer serverId = null;
+        if (binlogSource.getServerId() != null && binlogSource.getServerId() > 
0) {
+            serverId = binlogSource.getServerId();
+        }
+        String tables = binlogSource.getTableWhiteList();
+        final List<String> tableNames = Splitter.on(",").splitToList(tables);
+        final String serverTimeZone = binlogSource.getServerTimezone();
+
+        // TODO Needs to be configurable for those parameters
+        if (binlogSource.isAllMigration()) {
+            // Unique properties when migrate all tables in database
+            properties.put("migrate-all", "true");
+        }
+
+        return new MySqlExtractNode(binlogSource.getSourceName(),
+                binlogSource.getSourceName(),
+                fieldInfos,
+                null,
+                properties,
+                primaryKey,
+                tableNames,
+                hostName,
+                username,
+                password,
+                database,
+                port,
+                serverId,
+                true,
+                serverTimeZone);
+    }
+}
\ No newline at end of file
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/OracleProvider.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/OracleProvider.java
new file mode 100644
index 0000000000..72993b723a
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/OracleProvider.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sort.node.extract;
+
+import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.pojo.sort.node.ExtractNodeProvider;
+import org.apache.inlong.manager.pojo.source.oracle.OracleSource;
+import org.apache.inlong.manager.pojo.stream.StreamNode;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.constant.OracleConstant.ScanStartUpMode;
+import org.apache.inlong.sort.protocol.node.ExtractNode;
+import org.apache.inlong.sort.protocol.node.extract.OracleExtractNode;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The Provider for creating Oracle extract nodes.
+ */
+public class OracleProvider implements ExtractNodeProvider {
+
+    @Override
+    public Boolean accept(String sourceType) {
+        return SourceType.ORACLE.equals(sourceType);
+    }
+
+    @Override
+    public ExtractNode createNode(StreamNode streamNodeInfo) {
+        OracleSource source = (OracleSource) streamNodeInfo;
+        List<FieldInfo> fieldInfos = parseFieldInfos(source.getFieldList(), 
source.getSourceName());
+        Map<String, String> properties = 
parseProperties(source.getProperties());
+
+        ScanStartUpMode scanStartupMode = 
StringUtils.isBlank(source.getScanStartupMode())
+                ? null
+                : ScanStartUpMode.forName(source.getScanStartupMode());
+        return new OracleExtractNode(
+                source.getSourceName(),
+                source.getSourceName(),
+                fieldInfos,
+                null,
+                properties,
+                source.getPrimaryKey(),
+                source.getHostname(),
+                source.getUsername(),
+                source.getPassword(),
+                source.getDatabase(),
+                source.getSchemaName(),
+                source.getTableName(),
+                source.getPort(),
+                scanStartupMode);
+    }
+}
\ No newline at end of file
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/PostgreSqlProvider.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/PostgreSqlProvider.java
new file mode 100644
index 0000000000..128c30018e
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/PostgreSqlProvider.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sort.node.extract;
+
+import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.pojo.sort.node.ExtractNodeProvider;
+import org.apache.inlong.manager.pojo.source.postgresql.PostgreSQLSource;
+import org.apache.inlong.manager.pojo.stream.StreamNode;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.node.ExtractNode;
+import org.apache.inlong.sort.protocol.node.extract.PostgresExtractNode;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The Provider for creating PostgreSql extract nodes.
+ */
+public class PostgreSqlProvider implements ExtractNodeProvider {
+
+    @Override
+    public Boolean accept(String sourceType) {
+        return SourceType.POSTGRESQL.equals(sourceType);
+    }
+
+    @Override
+    public ExtractNode createNode(StreamNode streamNodeInfo) {
+        PostgreSQLSource postgreSQLSource = (PostgreSQLSource) streamNodeInfo;
+        List<FieldInfo> fieldInfos = 
parseFieldInfos(postgreSQLSource.getFieldList(), 
postgreSQLSource.getSourceName());
+        Map<String, String> properties = 
parseProperties(postgreSQLSource.getProperties());
+
+        return new PostgresExtractNode(postgreSQLSource.getSourceName(),
+                postgreSQLSource.getSourceName(),
+                fieldInfos,
+                null,
+                properties,
+                postgreSQLSource.getPrimaryKey(),
+                postgreSQLSource.getTableNameList(),
+                postgreSQLSource.getHostname(),
+                postgreSQLSource.getUsername(),
+                postgreSQLSource.getPassword(),
+                postgreSQLSource.getDatabase(),
+                postgreSQLSource.getSchema(),
+                postgreSQLSource.getPort(),
+                postgreSQLSource.getDecodingPluginName(),
+                postgreSQLSource.getServerTimeZone(),
+                postgreSQLSource.getScanStartupMode());
+    }
+}
\ No newline at end of file
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/PulsarProvider.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/PulsarProvider.java
new file mode 100644
index 0000000000..14cde851b7
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/PulsarProvider.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sort.node.extract;
+
+import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.pojo.sort.node.ExtractNodeProvider;
+import org.apache.inlong.manager.pojo.source.pulsar.PulsarSource;
+import org.apache.inlong.manager.pojo.stream.StreamNode;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.enums.PulsarScanStartupMode;
+import org.apache.inlong.sort.protocol.node.ExtractNode;
+import org.apache.inlong.sort.protocol.node.extract.PulsarExtractNode;
+import org.apache.inlong.sort.protocol.node.format.Format;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The Provider for creating Pulsar extract nodes.
+ */
+public class PulsarProvider implements ExtractNodeProvider {
+
+    @Override
+    public Boolean accept(String sourceType) {
+        return SourceType.PULSAR.equals(sourceType);
+    }
+
+    @Override
+    public ExtractNode createNode(StreamNode streamNodeInfo) {
+        PulsarSource pulsarSource = (PulsarSource) streamNodeInfo;
+        List<FieldInfo> fieldInfos = 
parseFieldInfos(pulsarSource.getFieldList(), pulsarSource.getSourceName());
+        Map<String, String> properties = 
parseProperties(pulsarSource.getProperties());
+
+        String fullTopicName =
+                pulsarSource.getPulsarTenant() + "/" + 
pulsarSource.getNamespace() + "/" + pulsarSource.getTopic();
+
+        Format format = parsingFormat(pulsarSource.getSerializationType(),
+                pulsarSource.isWrapWithInlongMsg(),
+                pulsarSource.getDataSeparator(),
+                pulsarSource.isIgnoreParseError());
+
+        PulsarScanStartupMode startupMode = 
PulsarScanStartupMode.forName(pulsarSource.getScanStartupMode());
+        final String primaryKey = pulsarSource.getPrimaryKey();
+        final String serviceUrl = pulsarSource.getServiceUrl();
+        final String adminUrl = pulsarSource.getAdminUrl();
+        final String scanStartupSubStartOffset =
+                StringUtils.isNotBlank(pulsarSource.getSubscription()) ? 
PulsarScanStartupMode.EARLIEST.getValue()
+                        : null;
+
+        return new PulsarExtractNode(pulsarSource.getSourceName(),
+                pulsarSource.getSourceName(),
+                fieldInfos,
+                null,
+                properties,
+                fullTopicName,
+                adminUrl,
+                serviceUrl,
+                format,
+                startupMode.getValue(),
+                primaryKey,
+                pulsarSource.getSubscription(),
+                scanStartupSubStartOffset);
+    }
+}
\ No newline at end of file
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/RedisProvider.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/RedisProvider.java
new file mode 100644
index 0000000000..18f435e89f
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/RedisProvider.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sort.node.extract;
+
+import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.pojo.sort.node.ExtractNodeProvider;
+import org.apache.inlong.manager.pojo.source.redis.RedisLookupOptions;
+import org.apache.inlong.manager.pojo.source.redis.RedisSource;
+import org.apache.inlong.manager.pojo.stream.StreamNode;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.LookupOptions;
+import org.apache.inlong.sort.protocol.enums.RedisCommand;
+import org.apache.inlong.sort.protocol.enums.RedisMode;
+import org.apache.inlong.sort.protocol.node.ExtractNode;
+import org.apache.inlong.sort.protocol.node.extract.RedisExtractNode;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The Provider for creating Redis extract nodes.
+ */
+public class RedisProvider implements ExtractNodeProvider {
+
+    @Override
+    public Boolean accept(String sourceType) {
+        return SourceType.REDIS.equals(sourceType);
+    }
+
+    @Override
+    public ExtractNode createNode(StreamNode streamNodeInfo) {
+        RedisSource source = (RedisSource) streamNodeInfo;
+        List<FieldInfo> fieldInfos = parseFieldInfos(source.getFieldList(), 
source.getSourceName());
+        Map<String, String> properties = 
parseProperties(source.getProperties());
+
+        RedisMode redisMode = RedisMode.forName(source.getRedisMode());
+        switch (redisMode) {
+            case STANDALONE:
+                return new RedisExtractNode(
+                        source.getSourceName(),
+                        source.getSourceName(),
+                        fieldInfos,
+                        null,
+                        properties,
+                        source.getPrimaryKey(),
+                        RedisCommand.forName(source.getCommand()),
+                        source.getHost(),
+                        source.getPort(),
+                        source.getPassword(),
+                        source.getAdditionalKey(),
+                        source.getDatabase(),
+                        source.getTimeout(),
+                        source.getSoTimeout(),
+                        source.getMaxTotal(),
+                        source.getMaxIdle(),
+                        source.getMinIdle(),
+                        parseLookupOptions(source.getLookupOptions()));
+            case SENTINEL:
+                return new RedisExtractNode(
+                        source.getSourceName(),
+                        source.getSourceName(),
+                        fieldInfos,
+                        null,
+                        properties,
+                        source.getPrimaryKey(),
+                        RedisCommand.forName(source.getCommand()),
+                        source.getMasterName(),
+                        source.getSentinelsInfo(),
+                        source.getPassword(),
+                        source.getAdditionalKey(),
+                        source.getDatabase(),
+                        source.getTimeout(),
+                        source.getSoTimeout(),
+                        source.getMaxTotal(),
+                        source.getMaxIdle(),
+                        source.getMinIdle(),
+                        parseLookupOptions(source.getLookupOptions()));
+            case CLUSTER:
+                return new RedisExtractNode(
+                        source.getSourceName(),
+                        source.getSourceName(),
+                        fieldInfos,
+                        null,
+                        properties,
+                        source.getPrimaryKey(),
+                        RedisCommand.forName(source.getCommand()),
+                        source.getClusterNodes(),
+                        source.getPassword(),
+                        source.getAdditionalKey(),
+                        source.getDatabase(),
+                        source.getTimeout(),
+                        source.getSoTimeout(),
+                        source.getMaxTotal(),
+                        source.getMaxIdle(),
+                        source.getMinIdle(),
+                        parseLookupOptions(source.getLookupOptions()));
+            default:
+                throw new IllegalArgumentException(String.format("Unsupported 
redis-mode=%s for Inlong", redisMode));
+        }
+    }
+
+    /**
+     * Parse LookupOptions
+     *
+     * @param options RedisLookupOptions
+     * @return LookupOptions
+     */
+    private static LookupOptions parseLookupOptions(RedisLookupOptions 
options) {
+        if (options == null) {
+            return null;
+        }
+        return new LookupOptions(options.getLookupCacheMaxRows(), 
options.getLookupCacheTtl(),
+                options.getLookupMaxRetries(), options.getLookupAsync());
+    }
+}
\ No newline at end of file
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/SqlServerProvider.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/SqlServerProvider.java
new file mode 100644
index 0000000000..f9ca5f985a
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/SqlServerProvider.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sort.node.extract;
+
+import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.pojo.sort.node.ExtractNodeProvider;
+import org.apache.inlong.manager.pojo.source.sqlserver.SQLServerSource;
+import org.apache.inlong.manager.pojo.stream.StreamNode;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.node.ExtractNode;
+import org.apache.inlong.sort.protocol.node.extract.SqlServerExtractNode;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The Provider for creating SQLServer extract nodes.
+ */
+public class SqlServerProvider implements ExtractNodeProvider {
+
+    @Override
+    public Boolean accept(String sourceType) {
+        return SourceType.SQLSERVER.equals(sourceType);
+    }
+
+    @Override
+    public ExtractNode createNode(StreamNode streamNodeInfo) {
+        SQLServerSource source = (SQLServerSource) streamNodeInfo;
+        List<FieldInfo> fieldInfos = parseFieldInfos(source.getFieldList(), 
source.getSourceName());
+        Map<String, String> properties = 
parseProperties(source.getProperties());
+
+        return new SqlServerExtractNode(
+                source.getSourceName(),
+                source.getSourceName(),
+                fieldInfos,
+                null,
+                properties,
+                source.getPrimaryKey(),
+                source.getHostname(),
+                source.getPort(),
+                source.getUsername(),
+                source.getPassword(),
+                source.getDatabase(),
+                source.getSchemaName(),
+                source.getTableName(),
+                source.getServerTimezone());
+    }
+}
\ No newline at end of file
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/TubeMqProvider.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/TubeMqProvider.java
new file mode 100644
index 0000000000..2a61b385a1
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/TubeMqProvider.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sort.node.extract;
+
+import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.pojo.sort.node.ExtractNodeProvider;
+import org.apache.inlong.manager.pojo.source.tubemq.TubeMQSource;
+import org.apache.inlong.manager.pojo.stream.StreamNode;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.node.ExtractNode;
+import org.apache.inlong.sort.protocol.node.extract.TubeMQExtractNode;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The Provider for creating TubeMQ extract nodes.
+ */
+public class TubeMqProvider implements ExtractNodeProvider {
+
+    @Override
+    public Boolean accept(String sourceType) {
+        return SourceType.TUBEMQ.equals(sourceType);
+    }
+
+    @Override
+    public ExtractNode createNode(StreamNode streamNodeInfo) {
+        TubeMQSource source = (TubeMQSource) streamNodeInfo;
+        List<FieldInfo> fieldInfos = parseFieldInfos(source.getFieldList(), 
source.getSourceName());
+        Map<String, String> properties = 
parseProperties(source.getProperties());
+
+        return new TubeMQExtractNode(
+                source.getSourceName(),
+                source.getSourceName(),
+                fieldInfos,
+                null,
+                properties,
+                source.getMasterRpc(),
+                source.getTopic(),
+                source.getSerializationType(),
+                source.getGroupId(),
+                source.getSessionKey(),
+                source.getTid());
+    }
+}
\ No newline at end of file
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java
deleted file mode 100644
index 44ea1b4fb5..0000000000
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java
+++ /dev/null
@@ -1,585 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.pojo.sort.util;
-
-import org.apache.inlong.common.enums.DataTypeEnum;
-import org.apache.inlong.manager.common.consts.SourceType;
-import org.apache.inlong.manager.pojo.source.StreamSource;
-import org.apache.inlong.manager.pojo.source.hudi.HudiSource;
-import org.apache.inlong.manager.pojo.source.kafka.KafkaOffset;
-import org.apache.inlong.manager.pojo.source.kafka.KafkaSource;
-import org.apache.inlong.manager.pojo.source.mongodb.MongoDBSource;
-import org.apache.inlong.manager.pojo.source.mysql.MySQLBinlogSource;
-import org.apache.inlong.manager.pojo.source.oracle.OracleSource;
-import org.apache.inlong.manager.pojo.source.postgresql.PostgreSQLSource;
-import org.apache.inlong.manager.pojo.source.pulsar.PulsarSource;
-import org.apache.inlong.manager.pojo.source.redis.RedisLookupOptions;
-import org.apache.inlong.manager.pojo.source.redis.RedisSource;
-import org.apache.inlong.manager.pojo.source.sqlserver.SQLServerSource;
-import org.apache.inlong.manager.pojo.source.tubemq.TubeMQSource;
-import org.apache.inlong.manager.pojo.stream.StreamField;
-import org.apache.inlong.sort.protocol.FieldInfo;
-import org.apache.inlong.sort.protocol.LookupOptions;
-import org.apache.inlong.sort.protocol.constant.HudiConstant.CatalogType;
-import org.apache.inlong.sort.protocol.constant.OracleConstant.ScanStartUpMode;
-import org.apache.inlong.sort.protocol.enums.KafkaScanStartupMode;
-import org.apache.inlong.sort.protocol.enums.PulsarScanStartupMode;
-import org.apache.inlong.sort.protocol.enums.RedisCommand;
-import org.apache.inlong.sort.protocol.enums.RedisMode;
-import org.apache.inlong.sort.protocol.node.ExtractNode;
-import org.apache.inlong.sort.protocol.node.extract.HudiExtractNode;
-import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode;
-import org.apache.inlong.sort.protocol.node.extract.MongoExtractNode;
-import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
-import org.apache.inlong.sort.protocol.node.extract.OracleExtractNode;
-import org.apache.inlong.sort.protocol.node.extract.PostgresExtractNode;
-import org.apache.inlong.sort.protocol.node.extract.PulsarExtractNode;
-import org.apache.inlong.sort.protocol.node.extract.RedisExtractNode;
-import org.apache.inlong.sort.protocol.node.extract.SqlServerExtractNode;
-import org.apache.inlong.sort.protocol.node.extract.TubeMQExtractNode;
-import org.apache.inlong.sort.protocol.node.format.AvroFormat;
-import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat;
-import org.apache.inlong.sort.protocol.node.format.CsvFormat;
-import org.apache.inlong.sort.protocol.node.format.DebeziumJsonFormat;
-import org.apache.inlong.sort.protocol.node.format.Format;
-import org.apache.inlong.sort.protocol.node.format.InLongMsgFormat;
-import org.apache.inlong.sort.protocol.node.format.JsonFormat;
-import org.apache.inlong.sort.protocol.node.format.RawFormat;
-
-import com.google.common.base.Splitter;
-import com.google.common.collect.Lists;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.stream.Collectors;
-
-/**
- * Parse SourceInfo to ExtractNode which sort needed
- */
-@Slf4j
-public class ExtractNodeUtils {
-
-    /**
-     * Create extract nodes from the given sources.
-     */
-    public static List<ExtractNode> createExtractNodes(List<StreamSource> 
sourceInfos) {
-        if (CollectionUtils.isEmpty(sourceInfos)) {
-            return Lists.newArrayList();
-        }
-        return sourceInfos.stream().map(ExtractNodeUtils::createExtractNode)
-                .collect(Collectors.toList());
-    }
-
-    public static ExtractNode createExtractNode(StreamSource sourceInfo) {
-        String sourceType = sourceInfo.getSourceType();
-        switch (sourceType) {
-            case SourceType.MYSQL_BINLOG:
-                return createExtractNode((MySQLBinlogSource) sourceInfo);
-            case SourceType.KAFKA:
-                return createExtractNode((KafkaSource) sourceInfo);
-            case SourceType.PULSAR:
-                return createExtractNode((PulsarSource) sourceInfo);
-            case SourceType.POSTGRESQL:
-                return createExtractNode((PostgreSQLSource) sourceInfo);
-            case SourceType.ORACLE:
-                return createExtractNode((OracleSource) sourceInfo);
-            case SourceType.SQLSERVER:
-                return createExtractNode((SQLServerSource) sourceInfo);
-            case SourceType.MONGODB:
-                return createExtractNode((MongoDBSource) sourceInfo);
-            case SourceType.TUBEMQ:
-                return createExtractNode((TubeMQSource) sourceInfo);
-            case SourceType.REDIS:
-                return createExtractNode((RedisSource) sourceInfo);
-            case SourceType.HUDI:
-                return createExtractNode((HudiSource) sourceInfo);
-            default:
-                throw new IllegalArgumentException(
-                        String.format("Unsupported sourceType=%s to create 
extractNode", sourceType));
-        }
-    }
-
-    /**
-     * Create MySql extract node
-     *
-     * @param binlogSource MySql binlog source info
-     * @return MySql extract node info
-     */
-    public static MySqlExtractNode createExtractNode(MySQLBinlogSource 
binlogSource) {
-        final String database = binlogSource.getDatabaseWhiteList();
-        final String primaryKey = binlogSource.getPrimaryKey();
-        final String hostName = binlogSource.getHostname();
-        final String username = binlogSource.getUser();
-        final String password = binlogSource.getPassword();
-        final Integer port = binlogSource.getPort();
-        Integer serverId = null;
-        if (binlogSource.getServerId() != null && binlogSource.getServerId() > 
0) {
-            serverId = binlogSource.getServerId();
-        }
-        String tables = binlogSource.getTableWhiteList();
-        final List<String> tableNames = Splitter.on(",").splitToList(tables);
-        List<FieldInfo> fieldInfos = 
parseFieldInfos(binlogSource.getFieldList(), binlogSource.getSourceName());
-        final String serverTimeZone = binlogSource.getServerTimezone();
-
-        // TODO Needs to be configurable for those parameters
-        Map<String, String> properties = 
parseProperties(binlogSource.getProperties());
-        if (binlogSource.isAllMigration()) {
-            // Unique properties when migrate all tables in database
-            properties.put("migrate-all", "true");
-        }
-
-        return new MySqlExtractNode(binlogSource.getSourceName(),
-                binlogSource.getSourceName(),
-                fieldInfos,
-                null,
-                properties,
-                primaryKey,
-                tableNames,
-                hostName,
-                username,
-                password,
-                database,
-                port,
-                serverId,
-                true,
-                serverTimeZone);
-    }
-
-    /**
-     * Create Kafka extract node
-     *
-     * @param kafkaSource Kafka source info
-     * @return Kafka extract node info
-     */
-    public static KafkaExtractNode createExtractNode(KafkaSource kafkaSource) {
-        List<FieldInfo> fieldInfos = 
parseFieldInfos(kafkaSource.getFieldList(), kafkaSource.getSourceName());
-        String topic = kafkaSource.getTopic();
-        String bootstrapServers = kafkaSource.getBootstrapServers();
-
-        Format format = parsingFormat(
-                kafkaSource.getSerializationType(),
-                kafkaSource.isWrapWithInlongMsg(),
-                kafkaSource.getDataSeparator(),
-                kafkaSource.isIgnoreParseErrors());
-
-        KafkaOffset kafkaOffset = 
KafkaOffset.forName(kafkaSource.getAutoOffsetReset());
-        KafkaScanStartupMode startupMode;
-        switch (kafkaOffset) {
-            case EARLIEST:
-                startupMode = KafkaScanStartupMode.EARLIEST_OFFSET;
-                break;
-            case SPECIFIC:
-                startupMode = KafkaScanStartupMode.SPECIFIC_OFFSETS;
-                break;
-            case TIMESTAMP_MILLIS:
-                startupMode = KafkaScanStartupMode.TIMESTAMP_MILLIS;
-                break;
-            case LATEST:
-            default:
-                startupMode = KafkaScanStartupMode.LATEST_OFFSET;
-        }
-        final String primaryKey = kafkaSource.getPrimaryKey();
-        String groupId = kafkaSource.getGroupId();
-        Map<String, String> properties = 
parseProperties(kafkaSource.getProperties());
-        String partitionOffset = kafkaSource.getPartitionOffsets();
-        String scanTimestampMillis = kafkaSource.getTimestampMillis();
-        return new KafkaExtractNode(kafkaSource.getSourceName(),
-                kafkaSource.getSourceName(),
-                fieldInfos,
-                null,
-                properties,
-                topic,
-                bootstrapServers,
-                format,
-                startupMode,
-                primaryKey,
-                groupId,
-                partitionOffset,
-                scanTimestampMillis);
-    }
-
-    /**
-     * Create Pulsar extract node
-     *
-     * @param pulsarSource Pulsar source info
-     * @return Pulsar extract node info
-     */
-    public static PulsarExtractNode createExtractNode(PulsarSource 
pulsarSource) {
-        List<FieldInfo> fieldInfos = 
parseFieldInfos(pulsarSource.getFieldList(), pulsarSource.getSourceName());
-        String fullTopicName =
-                pulsarSource.getPulsarTenant() + "/" + 
pulsarSource.getNamespace() + "/" + pulsarSource.getTopic();
-
-        Format format = parsingFormat(pulsarSource.getSerializationType(),
-                pulsarSource.isWrapWithInlongMsg(),
-                pulsarSource.getDataSeparator(),
-                pulsarSource.isIgnoreParseError());
-
-        PulsarScanStartupMode startupMode = 
PulsarScanStartupMode.forName(pulsarSource.getScanStartupMode());
-        final String primaryKey = pulsarSource.getPrimaryKey();
-        final String serviceUrl = pulsarSource.getServiceUrl();
-        final String adminUrl = pulsarSource.getAdminUrl();
-        final String scanStartupSubStartOffset =
-                StringUtils.isNotBlank(pulsarSource.getSubscription()) ? 
PulsarScanStartupMode.EARLIEST.getValue()
-                        : null;
-        Map<String, String> properties = 
parseProperties(pulsarSource.getProperties());
-
-        return new PulsarExtractNode(pulsarSource.getSourceName(),
-                pulsarSource.getSourceName(),
-                fieldInfos,
-                null,
-                properties,
-                fullTopicName,
-                adminUrl,
-                serviceUrl,
-                format,
-                startupMode.getValue(),
-                primaryKey,
-                pulsarSource.getSubscription(),
-                scanStartupSubStartOffset);
-    }
-
-    /**
-     * Create PostgreSQL extract node
-     *
-     * @param postgreSQLSource PostgreSQL source info
-     * @return PostgreSQL extract node info
-     */
-    public static PostgresExtractNode createExtractNode(PostgreSQLSource 
postgreSQLSource) {
-        List<FieldInfo> fieldInfos = 
parseFieldInfos(postgreSQLSource.getFieldList(), 
postgreSQLSource.getSourceName());
-        Map<String, String> properties = 
parseProperties(postgreSQLSource.getProperties());
-        return new PostgresExtractNode(postgreSQLSource.getSourceName(),
-                postgreSQLSource.getSourceName(),
-                fieldInfos,
-                null,
-                properties,
-                postgreSQLSource.getPrimaryKey(),
-                postgreSQLSource.getTableNameList(),
-                postgreSQLSource.getHostname(),
-                postgreSQLSource.getUsername(),
-                postgreSQLSource.getPassword(),
-                postgreSQLSource.getDatabase(),
-                postgreSQLSource.getSchema(),
-                postgreSQLSource.getPort(),
-                postgreSQLSource.getDecodingPluginName(),
-                postgreSQLSource.getServerTimeZone(),
-                postgreSQLSource.getScanStartupMode());
-    }
-
-    /**
-     * Create Oracle extract node
-     *
-     * @param source Oracle source info
-     * @return oracle extract node info
-     */
-    public static OracleExtractNode createExtractNode(OracleSource source) {
-        List<FieldInfo> fieldInfos = parseFieldInfos(source.getFieldList(), 
source.getSourceName());
-        ScanStartUpMode scanStartupMode = 
StringUtils.isBlank(source.getScanStartupMode())
-                ? null
-                : ScanStartUpMode.forName(source.getScanStartupMode());
-        Map<String, String> properties = 
parseProperties(source.getProperties());
-        return new OracleExtractNode(
-                source.getSourceName(),
-                source.getSourceName(),
-                fieldInfos,
-                null,
-                properties,
-                source.getPrimaryKey(),
-                source.getHostname(),
-                source.getUsername(),
-                source.getPassword(),
-                source.getDatabase(),
-                source.getSchemaName(),
-                source.getTableName(),
-                source.getPort(),
-                scanStartupMode);
-    }
-
-    /**
-     * Create SQLServer extract node
-     *
-     * @param source SQLServer source info
-     * @return SQLServer extract node info
-     */
-    public static SqlServerExtractNode createExtractNode(SQLServerSource 
source) {
-        List<FieldInfo> fieldInfos = parseFieldInfos(source.getFieldList(), 
source.getSourceName());
-        Map<String, String> properties = 
parseProperties(source.getProperties());
-        return new SqlServerExtractNode(
-                source.getSourceName(),
-                source.getSourceName(),
-                fieldInfos,
-                null,
-                properties,
-                source.getPrimaryKey(),
-                source.getHostname(),
-                source.getPort(),
-                source.getUsername(),
-                source.getPassword(),
-                source.getDatabase(),
-                source.getSchemaName(),
-                source.getTableName(),
-                source.getServerTimezone());
-    }
-
-    /**
-     * Create MongoDB extract node
-     *
-     * @param source MongoDB source info
-     * @return MongoDB extract node info
-     */
-    public static MongoExtractNode createExtractNode(MongoDBSource source) {
-        List<FieldInfo> fieldInfos = parseFieldInfos(source.getFieldList(), 
source.getSourceName());
-        Map<String, String> properties = 
parseProperties(source.getProperties());
-        return new MongoExtractNode(
-                source.getSourceName(),
-                source.getSourceName(),
-                fieldInfos,
-                null,
-                properties,
-                source.getCollection(),
-                source.getHosts(),
-                source.getUsername(),
-                source.getPassword(),
-                source.getDatabase());
-    }
-
-    /**
-     * Create TubeMQ extract node
-     *
-     * @param source TubeMQ source info
-     * @return TubeMQ extract node info
-     */
-    public static TubeMQExtractNode createExtractNode(TubeMQSource source) {
-        List<FieldInfo> fieldInfos = parseFieldInfos(source.getFieldList(), 
source.getSourceName());
-        Map<String, String> properties = 
parseProperties(source.getProperties());
-        return new TubeMQExtractNode(
-                source.getSourceName(),
-                source.getSourceName(),
-                fieldInfos,
-                null,
-                properties,
-                source.getMasterRpc(),
-                source.getTopic(),
-                source.getSerializationType(),
-                source.getGroupId(),
-                source.getSessionKey(),
-                source.getTid());
-    }
-
-    /**
-     * Create Redis extract node
-     *
-     * @param source redis source info
-     * @return redis extract source info
-     */
-    public static RedisExtractNode createExtractNode(RedisSource source) {
-        List<FieldInfo> fieldInfos = parseFieldInfos(source.getFieldList(), 
source.getSourceName());
-        Map<String, String> properties = 
parseProperties(source.getProperties());
-        RedisMode redisMode = RedisMode.forName(source.getRedisMode());
-        switch (redisMode) {
-            case STANDALONE:
-                return new RedisExtractNode(
-                        source.getSourceName(),
-                        source.getSourceName(),
-                        fieldInfos,
-                        null,
-                        properties,
-                        source.getPrimaryKey(),
-                        RedisCommand.forName(source.getCommand()),
-                        source.getHost(),
-                        source.getPort(),
-                        source.getPassword(),
-                        source.getAdditionalKey(),
-                        source.getDatabase(),
-                        source.getTimeout(),
-                        source.getSoTimeout(),
-                        source.getMaxTotal(),
-                        source.getMaxIdle(),
-                        source.getMinIdle(),
-                        parseLookupOptions(source.getLookupOptions()));
-            case SENTINEL:
-                return new RedisExtractNode(
-                        source.getSourceName(),
-                        source.getSourceName(),
-                        fieldInfos,
-                        null,
-                        properties,
-                        source.getPrimaryKey(),
-                        RedisCommand.forName(source.getCommand()),
-                        source.getMasterName(),
-                        source.getSentinelsInfo(),
-                        source.getPassword(),
-                        source.getAdditionalKey(),
-                        source.getDatabase(),
-                        source.getTimeout(),
-                        source.getSoTimeout(),
-                        source.getMaxTotal(),
-                        source.getMaxIdle(),
-                        source.getMinIdle(),
-                        parseLookupOptions(source.getLookupOptions()));
-            case CLUSTER:
-                return new RedisExtractNode(
-                        source.getSourceName(),
-                        source.getSourceName(),
-                        fieldInfos,
-                        null,
-                        properties,
-                        source.getPrimaryKey(),
-                        RedisCommand.forName(source.getCommand()),
-                        source.getClusterNodes(),
-                        source.getPassword(),
-                        source.getAdditionalKey(),
-                        source.getDatabase(),
-                        source.getTimeout(),
-                        source.getSoTimeout(),
-                        source.getMaxTotal(),
-                        source.getMaxIdle(),
-                        source.getMinIdle(),
-                        parseLookupOptions(source.getLookupOptions()));
-            default:
-                throw new IllegalArgumentException(String.format("Unsupported 
redis-mode=%s for Inlong", redisMode));
-        }
-
-    }
-
-    /**
-     * Create Hudi extract node
-     *
-     * @param source hudi source info
-     * @return hudi extract source info
-     */
-    public static HudiExtractNode createExtractNode(HudiSource source) {
-        List<FieldInfo> fieldInfos = parseFieldInfos(source.getFieldList(), 
source.getSourceName());
-        Map<String, String> properties = 
parseProperties(source.getProperties());
-
-        return new HudiExtractNode(
-                source.getSourceName(),
-                source.getSourceName(),
-                fieldInfos,
-                null,
-                source.getCatalogUri(),
-                source.getWarehouse(),
-                source.getDbName(),
-                source.getTableName(),
-                CatalogType.HIVE,
-                source.getCheckIntervalInMinus(),
-                source.isReadStreamingSkipCompaction(),
-                source.getReadStartCommit(),
-                properties,
-                source.getExtList());
-    }
-
-    /**
-     * Parse format
-     *
-     * @param serializationType data serialization, support: csv, json, canal, 
avro, etc
-     * @param wrapWithInlongMsg whether wrap content with {@link 
InLongMsgFormat}
-     * @param separatorStr the separator of data content
-     * @param ignoreParseErrors whether ignore deserialization error data
-     * @return the format for serialized content
-     */
-    private static Format parsingFormat(
-            String serializationType,
-            boolean wrapWithInlongMsg,
-            String separatorStr,
-            boolean ignoreParseErrors) {
-        Format format;
-        DataTypeEnum dataType = DataTypeEnum.forType(serializationType);
-        switch (dataType) {
-            case CSV:
-                if (StringUtils.isNumeric(separatorStr)) {
-                    char dataSeparator = (char) Integer.parseInt(separatorStr);
-                    separatorStr = Character.toString(dataSeparator);
-                }
-                CsvFormat csvFormat = new CsvFormat(separatorStr);
-                csvFormat.setIgnoreParseErrors(ignoreParseErrors);
-                format = csvFormat;
-                break;
-            case AVRO:
-                format = new AvroFormat();
-                break;
-            case JSON:
-                JsonFormat jsonFormat = new JsonFormat();
-                jsonFormat.setIgnoreParseErrors(ignoreParseErrors);
-                format = jsonFormat;
-                break;
-            case CANAL:
-                format = new CanalJsonFormat();
-                break;
-            case DEBEZIUM_JSON:
-                DebeziumJsonFormat debeziumJsonFormat = new 
DebeziumJsonFormat();
-                debeziumJsonFormat.setIgnoreParseErrors(ignoreParseErrors);
-                format = debeziumJsonFormat;
-                break;
-            case RAW:
-                format = new RawFormat();
-                break;
-            default:
-                throw new IllegalArgumentException(String.format("Unsupported 
dataType=%s", dataType));
-        }
-        if (wrapWithInlongMsg) {
-            Format innerFormat = format;
-            format = new InLongMsgFormat(innerFormat, false);
-        }
-        return format;
-    }
-
-    /**
-     * Parse FieldInfos
-     *
-     * @param streamFields The stream fields
-     * @param nodeId The node id
-     * @return FieldInfo list
-     */
-    private static List<FieldInfo> parseFieldInfos(List<StreamField> 
streamFields, String nodeId) {
-        // Filter constant fields
-        return streamFields.stream().filter(s -> s.getFieldValue() == null)
-                .map(streamFieldInfo -> 
FieldInfoUtils.parseStreamFieldInfo(streamFieldInfo, nodeId))
-                .collect(Collectors.toList());
-    }
-
-    /**
-     * Parse properties
-     *
-     * @param properties The properties with string key and object value
-     * @return The properties with string key and string value
-     */
-    private static Map<String, String> parseProperties(Map<String, Object> 
properties) {
-        return properties.entrySet().stream()
-                .filter(v -> Objects.nonNull(v.getValue()))
-                .collect(Collectors.toMap(Map.Entry::getKey, e -> 
e.getValue().toString()));
-    }
-
-    /**
-     * Parse LookupOptions
-     *
-     * @param options
-     * @return LookupOptions
-     */
-    private static LookupOptions parseLookupOptions(RedisLookupOptions 
options) {
-        if (options == null) {
-            return null;
-        }
-        return new LookupOptions(options.getLookupCacheMaxRows(), 
options.getLookupCacheTtl(),
-                options.getLookupMaxRetries(), options.getLookupAsync());
-    }
-
-}
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
index 86a33f53d8..d23f08ddf3 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
@@ -21,7 +21,7 @@ import 
org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.sink.StreamSink;
-import org.apache.inlong.manager.pojo.sort.util.ExtractNodeUtils;
+import org.apache.inlong.manager.pojo.sort.node.NodeProviderFactory;
 import org.apache.inlong.manager.pojo.sort.util.LoadNodeUtils;
 import org.apache.inlong.manager.pojo.sort.util.NodeRelationUtils;
 import org.apache.inlong.manager.pojo.sort.util.TransformNodeUtils;
@@ -211,7 +211,7 @@ public class DefaultSortConfigOperator implements 
SortConfigOperator {
     private List<Node> createNodes(List<StreamSource> sources, 
List<TransformResponse> transformResponses,
             List<StreamSink> sinks, Map<String, StreamField> constantFieldMap) 
{
         List<Node> nodes = new ArrayList<>();
-        nodes.addAll(ExtractNodeUtils.createExtractNodes(sources));
+        nodes.addAll(NodeProviderFactory.createExtractNodes(sources));
         
nodes.addAll(TransformNodeUtils.createTransformNodes(transformResponses, 
constantFieldMap));
         nodes.addAll(LoadNodeUtils.createLoadNodes(sinks, constantFieldMap));
         return nodes;

Reply via email to