This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 0318b853d1 [INLONG-8154][Manager] Use the factory pattern to optimize
the way of creating ExtractNodes (#8162)
0318b853d1 is described below
commit 0318b853d11e3badd06fba75ecee7218cbf42fb0
Author: chestnufang <[email protected]>
AuthorDate: Tue Jun 6 15:35:54 2023 +0800
[INLONG-8154][Manager] Use the factory pattern to optimize the way of
creating ExtractNodes (#8162)
Co-authored-by: chestnufang <[email protected]>
---
.../pojo/sort/node/ExtractNodeProvider.java | 130 +++++
.../manager/pojo/sort/node/NodeProvider.java | 40 ++
.../pojo/sort/node/NodeProviderFactory.java | 93 ++++
.../pojo/sort/node/extract/HudiProvider.java | 64 +++
.../pojo/sort/node/extract/KafkaProvider.java | 93 ++++
.../pojo/sort/node/extract/MongoProvider.java | 59 +++
.../sort/node/extract/MysqlBinlogProvider.java | 85 +++
.../pojo/sort/node/extract/OracleProvider.java | 69 +++
.../pojo/sort/node/extract/PostgreSqlProvider.java | 64 +++
.../pojo/sort/node/extract/PulsarProvider.java | 81 +++
.../pojo/sort/node/extract/RedisProvider.java | 130 +++++
.../pojo/sort/node/extract/SqlServerProvider.java | 63 +++
.../pojo/sort/node/extract/TubeMqProvider.java | 60 +++
.../manager/pojo/sort/util/ExtractNodeUtils.java | 585 ---------------------
.../resource/sort/DefaultSortConfigOperator.java | 4 +-
15 files changed, 1033 insertions(+), 587 deletions(-)
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/ExtractNodeProvider.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/ExtractNodeProvider.java
new file mode 100644
index 0000000000..aeed2df4c8
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/ExtractNodeProvider.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sort.node;
+
+import org.apache.inlong.common.enums.DataTypeEnum;
+import org.apache.inlong.manager.pojo.sort.util.FieldInfoUtils;
+import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.inlong.manager.pojo.stream.StreamNode;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.node.ExtractNode;
+import org.apache.inlong.sort.protocol.node.format.AvroFormat;
+import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat;
+import org.apache.inlong.sort.protocol.node.format.CsvFormat;
+import org.apache.inlong.sort.protocol.node.format.DebeziumJsonFormat;
+import org.apache.inlong.sort.protocol.node.format.Format;
+import org.apache.inlong.sort.protocol.node.format.InLongMsgFormat;
+import org.apache.inlong.sort.protocol.node.format.JsonFormat;
+import org.apache.inlong.sort.protocol.node.format.RawFormat;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * Interface of the extract node provider
+ */
+public interface ExtractNodeProvider extends NodeProvider {
+
+ /**
+ * Determines whether the current instance matches the specified type.
+ *
+ * @param sourceType the specified source type
+ * @return whether the current instance matches the specified type
+ */
+ Boolean accept(String sourceType);
+
+ /**
+ * Create extract node by stream node info
+ *
+ * @param nodeInfo stream node info
+ * @return the extract node
+ */
+ ExtractNode createNode(StreamNode nodeInfo);
+
+ /**
+ * Parse FieldInfos
+ *
+ * @param streamFields The stream fields
+ * @param nodeId The node id
+ * @return FieldInfo list
+ */
+ default List<FieldInfo> parseFieldInfos(List<StreamField> streamFields,
String nodeId) {
+ // Filter constant fields
+ return streamFields.stream().filter(s ->
Objects.isNull(s.getFieldValue()))
+ .map(streamFieldInfo ->
FieldInfoUtils.parseStreamFieldInfo(streamFieldInfo, nodeId))
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Parse format
+ *
+ * @param serializationType data serialization, support: csv, json, canal,
avro, etc
+ * @param wrapWithInlongMsg whether wrap content with {@link
InLongMsgFormat}
+ * @param separatorStr the separator of data content
+ * @param ignoreParseErrors whether ignore deserialization error data
+ * @return the format for serialized content
+ */
+ default Format parsingFormat(
+ String serializationType,
+ boolean wrapWithInlongMsg,
+ String separatorStr,
+ boolean ignoreParseErrors) {
+ Format format;
+ DataTypeEnum dataType = DataTypeEnum.forType(serializationType);
+ switch (dataType) {
+ case CSV:
+ if (StringUtils.isNumeric(separatorStr)) {
+ char dataSeparator = (char) Integer.parseInt(separatorStr);
+ separatorStr = Character.toString(dataSeparator);
+ }
+ CsvFormat csvFormat = new CsvFormat(separatorStr);
+ csvFormat.setIgnoreParseErrors(ignoreParseErrors);
+ format = csvFormat;
+ break;
+ case AVRO:
+ format = new AvroFormat();
+ break;
+ case JSON:
+ JsonFormat jsonFormat = new JsonFormat();
+ jsonFormat.setIgnoreParseErrors(ignoreParseErrors);
+ format = jsonFormat;
+ break;
+ case CANAL:
+ format = new CanalJsonFormat();
+ break;
+ case DEBEZIUM_JSON:
+ DebeziumJsonFormat debeziumJsonFormat = new
DebeziumJsonFormat();
+ debeziumJsonFormat.setIgnoreParseErrors(ignoreParseErrors);
+ format = debeziumJsonFormat;
+ break;
+ case RAW:
+ format = new RawFormat();
+ break;
+ default:
+ throw new IllegalArgumentException(String.format("Unsupported
dataType=%s", dataType));
+ }
+ if (wrapWithInlongMsg) {
+ Format innerFormat = format;
+ format = new InLongMsgFormat(innerFormat, false);
+ }
+ return format;
+ }
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/NodeProvider.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/NodeProvider.java
new file mode 100644
index 0000000000..f8f7efd030
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/NodeProvider.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sort.node;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * Interface of the node provider
+ */
+public interface NodeProvider {
+
+ /**
+ * Parse properties
+ *
+ * @param properties The properties with string key and object value
+ * @return The properties with string key and string value
+ */
+ default Map<String, String> parseProperties(Map<String, Object>
properties) {
+ return properties.entrySet().stream()
+ .filter(v -> Objects.nonNull(v.getValue()))
+ .collect(Collectors.toMap(Map.Entry::getKey, e ->
e.getValue().toString()));
+ }
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/NodeProviderFactory.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/NodeProviderFactory.java
new file mode 100644
index 0000000000..35c447ec7e
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/NodeProviderFactory.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sort.node;
+
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.pojo.sort.node.extract.HudiProvider;
+import org.apache.inlong.manager.pojo.sort.node.extract.KafkaProvider;
+import org.apache.inlong.manager.pojo.sort.node.extract.MongoProvider;
+import org.apache.inlong.manager.pojo.sort.node.extract.MysqlBinlogProvider;
+import org.apache.inlong.manager.pojo.sort.node.extract.OracleProvider;
+import org.apache.inlong.manager.pojo.sort.node.extract.PostgreSqlProvider;
+import org.apache.inlong.manager.pojo.sort.node.extract.PulsarProvider;
+import org.apache.inlong.manager.pojo.sort.node.extract.RedisProvider;
+import org.apache.inlong.manager.pojo.sort.node.extract.SqlServerProvider;
+import org.apache.inlong.manager.pojo.sort.node.extract.TubeMqProvider;
+import org.apache.inlong.manager.pojo.source.StreamSource;
+import org.apache.inlong.sort.protocol.node.ExtractNode;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.collections.CollectionUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * The node provider factory.
+ */
+public class NodeProviderFactory {
+
+ /**
+ * The extract node provider collection
+ */
+ private static final List<ExtractNodeProvider> EXTRACT_NODE_PROVIDER_LIST
= new ArrayList<>();
+
+ static {
+ // The Providers Parsing SourceInfo to ExtractNode which sort needed
+ EXTRACT_NODE_PROVIDER_LIST.add(new HudiProvider());
+ EXTRACT_NODE_PROVIDER_LIST.add(new KafkaProvider());
+ EXTRACT_NODE_PROVIDER_LIST.add(new MongoProvider());
+ EXTRACT_NODE_PROVIDER_LIST.add(new OracleProvider());
+ EXTRACT_NODE_PROVIDER_LIST.add(new PulsarProvider());
+ EXTRACT_NODE_PROVIDER_LIST.add(new RedisProvider());
+ EXTRACT_NODE_PROVIDER_LIST.add(new TubeMqProvider());
+ EXTRACT_NODE_PROVIDER_LIST.add(new SqlServerProvider());
+ EXTRACT_NODE_PROVIDER_LIST.add(new PostgreSqlProvider());
+ EXTRACT_NODE_PROVIDER_LIST.add(new MysqlBinlogProvider());
+
+ }
+
+ /**
+ * Get extract node provider
+ *
+ * @param sourceType the specified source type
+ * @return the extract node provider
+ */
+ public static ExtractNodeProvider getExtractNodeProvider(String
sourceType) {
+ return EXTRACT_NODE_PROVIDER_LIST.stream()
+ .filter(inst -> inst.accept(sourceType))
+ .findFirst()
+ .orElseThrow(() -> new
BusinessException(ErrorCodeEnum.SOURCE_TYPE_NOT_SUPPORT,
+
String.format(ErrorCodeEnum.SOURCE_TYPE_NOT_SUPPORT.getMessage(), sourceType)));
+ }
+
+ /**
+ * Create extract nodes from the given sources.
+ */
+ public static List<ExtractNode> createExtractNodes(List<StreamSource>
sourceInfos) {
+ if (CollectionUtils.isEmpty(sourceInfos)) {
+ return Lists.newArrayList();
+ }
+ return sourceInfos.stream().map(v -> {
+ String sourceType = v.getSourceType();
+ return getExtractNodeProvider(sourceType).createNode(v);
+ }).collect(Collectors.toList());
+ }
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/HudiProvider.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/HudiProvider.java
new file mode 100644
index 0000000000..ebbd00e326
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/HudiProvider.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sort.node.extract;
+
+import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.pojo.sort.node.ExtractNodeProvider;
+import org.apache.inlong.manager.pojo.source.hudi.HudiSource;
+import org.apache.inlong.manager.pojo.stream.StreamNode;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.constant.HudiConstant.CatalogType;
+import org.apache.inlong.sort.protocol.node.ExtractNode;
+import org.apache.inlong.sort.protocol.node.extract.HudiExtractNode;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The Provider for creating Hudi extract nodes.
+ */
+public class HudiProvider implements ExtractNodeProvider {
+
+ @Override
+ public Boolean accept(String sourceType) {
+ return SourceType.HUDI.equals(sourceType);
+ }
+
+ @Override
+ public ExtractNode createNode(StreamNode streamNodeInfo) {
+ HudiSource source = (HudiSource) streamNodeInfo;
+ List<FieldInfo> fieldInfos = parseFieldInfos(source.getFieldList(),
source.getSourceName());
+ Map<String, String> properties =
parseProperties(source.getProperties());
+
+ return new HudiExtractNode(
+ source.getSourceName(),
+ source.getSourceName(),
+ fieldInfos,
+ null,
+ source.getCatalogUri(),
+ source.getWarehouse(),
+ source.getDbName(),
+ source.getTableName(),
+ CatalogType.HIVE,
+ source.getCheckIntervalInMinus(),
+ source.isReadStreamingSkipCompaction(),
+ source.getReadStartCommit(),
+ properties,
+ source.getExtList());
+ }
+}
\ No newline at end of file
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/KafkaProvider.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/KafkaProvider.java
new file mode 100644
index 0000000000..88b63f28e7
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/KafkaProvider.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sort.node.extract;
+
+import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.pojo.sort.node.ExtractNodeProvider;
+import org.apache.inlong.manager.pojo.source.kafka.KafkaOffset;
+import org.apache.inlong.manager.pojo.source.kafka.KafkaSource;
+import org.apache.inlong.manager.pojo.stream.StreamNode;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.enums.KafkaScanStartupMode;
+import org.apache.inlong.sort.protocol.node.ExtractNode;
+import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode;
+import org.apache.inlong.sort.protocol.node.format.Format;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The Provider for creating Kafka extract nodes.
+ */
+public class KafkaProvider implements ExtractNodeProvider {
+
+ @Override
+ public Boolean accept(String sourceType) {
+ return SourceType.KAFKA.equals(sourceType);
+ }
+
+ @Override
+ public ExtractNode createNode(StreamNode streamNodeInfo) {
+ KafkaSource kafkaSource = (KafkaSource) streamNodeInfo;
+ List<FieldInfo> fieldInfos =
parseFieldInfos(kafkaSource.getFieldList(), kafkaSource.getSourceName());
+ Map<String, String> properties =
parseProperties(kafkaSource.getProperties());
+
+ String topic = kafkaSource.getTopic();
+ String bootstrapServers = kafkaSource.getBootstrapServers();
+
+ Format format = parsingFormat(
+ kafkaSource.getSerializationType(),
+ kafkaSource.isWrapWithInlongMsg(),
+ kafkaSource.getDataSeparator(),
+ kafkaSource.isIgnoreParseErrors());
+
+ KafkaOffset kafkaOffset =
KafkaOffset.forName(kafkaSource.getAutoOffsetReset());
+ KafkaScanStartupMode startupMode;
+ switch (kafkaOffset) {
+ case EARLIEST:
+ startupMode = KafkaScanStartupMode.EARLIEST_OFFSET;
+ break;
+ case SPECIFIC:
+ startupMode = KafkaScanStartupMode.SPECIFIC_OFFSETS;
+ break;
+ case TIMESTAMP_MILLIS:
+ startupMode = KafkaScanStartupMode.TIMESTAMP_MILLIS;
+ break;
+ case LATEST:
+ default:
+ startupMode = KafkaScanStartupMode.LATEST_OFFSET;
+ }
+ final String primaryKey = kafkaSource.getPrimaryKey();
+ String groupId = kafkaSource.getGroupId();
+ String partitionOffset = kafkaSource.getPartitionOffsets();
+ String scanTimestampMillis = kafkaSource.getTimestampMillis();
+ return new KafkaExtractNode(kafkaSource.getSourceName(),
+ kafkaSource.getSourceName(),
+ fieldInfos,
+ null,
+ properties,
+ topic,
+ bootstrapServers,
+ format,
+ startupMode,
+ primaryKey,
+ groupId,
+ partitionOffset,
+ scanTimestampMillis);
+ }
+}
\ No newline at end of file
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/MongoProvider.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/MongoProvider.java
new file mode 100644
index 0000000000..72013e2386
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/MongoProvider.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sort.node.extract;
+
+import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.pojo.sort.node.ExtractNodeProvider;
+import org.apache.inlong.manager.pojo.source.mongodb.MongoDBSource;
+import org.apache.inlong.manager.pojo.stream.StreamNode;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.node.ExtractNode;
+import org.apache.inlong.sort.protocol.node.extract.MongoExtractNode;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The Provider for creating Mongo extract nodes.
+ */
+public class MongoProvider implements ExtractNodeProvider {
+
+ @Override
+ public Boolean accept(String sourceType) {
+ return SourceType.MONGODB.equals(sourceType);
+ }
+
+ @Override
+ public ExtractNode createNode(StreamNode streamNodeInfo) {
+ MongoDBSource source = (MongoDBSource) streamNodeInfo;
+ List<FieldInfo> fieldInfos = parseFieldInfos(source.getFieldList(),
source.getSourceName());
+ Map<String, String> properties =
parseProperties(source.getProperties());
+
+ return new MongoExtractNode(
+ source.getSourceName(),
+ source.getSourceName(),
+ fieldInfos,
+ null,
+ properties,
+ source.getCollection(),
+ source.getHosts(),
+ source.getUsername(),
+ source.getPassword(),
+ source.getDatabase());
+ }
+}
\ No newline at end of file
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/MysqlBinlogProvider.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/MysqlBinlogProvider.java
new file mode 100644
index 0000000000..9732a8d277
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/MysqlBinlogProvider.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sort.node.extract;
+
+import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.pojo.sort.node.ExtractNodeProvider;
+import org.apache.inlong.manager.pojo.source.mysql.MySQLBinlogSource;
+import org.apache.inlong.manager.pojo.stream.StreamNode;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.node.ExtractNode;
+import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
+
+import com.google.common.base.Splitter;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The Provider for creating MysqlBinlog extract nodes.
+ */
+public class MysqlBinlogProvider implements ExtractNodeProvider {
+
+ @Override
+ public Boolean accept(String sourceType) {
+ return SourceType.MYSQL_BINLOG.equals(sourceType);
+ }
+
+ @Override
+ public ExtractNode createNode(StreamNode streamNodeInfo) {
+ MySQLBinlogSource binlogSource = (MySQLBinlogSource) streamNodeInfo;
+ List<FieldInfo> fieldInfos =
parseFieldInfos(binlogSource.getFieldList(), binlogSource.getSourceName());
+ Map<String, String> properties =
parseProperties(binlogSource.getProperties());
+
+ final String database = binlogSource.getDatabaseWhiteList();
+ final String primaryKey = binlogSource.getPrimaryKey();
+ final String hostName = binlogSource.getHostname();
+ final String username = binlogSource.getUser();
+ final String password = binlogSource.getPassword();
+ final Integer port = binlogSource.getPort();
+ Integer serverId = null;
+ if (binlogSource.getServerId() != null && binlogSource.getServerId() >
0) {
+ serverId = binlogSource.getServerId();
+ }
+ String tables = binlogSource.getTableWhiteList();
+ final List<String> tableNames = Splitter.on(",").splitToList(tables);
+ final String serverTimeZone = binlogSource.getServerTimezone();
+
+ // TODO Needs to be configurable for those parameters
+ if (binlogSource.isAllMigration()) {
+ // Unique properties when migrate all tables in database
+ properties.put("migrate-all", "true");
+ }
+
+ return new MySqlExtractNode(binlogSource.getSourceName(),
+ binlogSource.getSourceName(),
+ fieldInfos,
+ null,
+ properties,
+ primaryKey,
+ tableNames,
+ hostName,
+ username,
+ password,
+ database,
+ port,
+ serverId,
+ true,
+ serverTimeZone);
+ }
+}
\ No newline at end of file
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/OracleProvider.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/OracleProvider.java
new file mode 100644
index 0000000000..72993b723a
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/OracleProvider.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sort.node.extract;
+
+import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.pojo.sort.node.ExtractNodeProvider;
+import org.apache.inlong.manager.pojo.source.oracle.OracleSource;
+import org.apache.inlong.manager.pojo.stream.StreamNode;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.constant.OracleConstant.ScanStartUpMode;
+import org.apache.inlong.sort.protocol.node.ExtractNode;
+import org.apache.inlong.sort.protocol.node.extract.OracleExtractNode;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The Provider for creating Oracle extract nodes.
+ */
+public class OracleProvider implements ExtractNodeProvider {
+
+ @Override
+ public Boolean accept(String sourceType) {
+ return SourceType.ORACLE.equals(sourceType);
+ }
+
+ @Override
+ public ExtractNode createNode(StreamNode streamNodeInfo) {
+ OracleSource source = (OracleSource) streamNodeInfo;
+ List<FieldInfo> fieldInfos = parseFieldInfos(source.getFieldList(),
source.getSourceName());
+ Map<String, String> properties =
parseProperties(source.getProperties());
+
+ ScanStartUpMode scanStartupMode =
StringUtils.isBlank(source.getScanStartupMode())
+ ? null
+ : ScanStartUpMode.forName(source.getScanStartupMode());
+ return new OracleExtractNode(
+ source.getSourceName(),
+ source.getSourceName(),
+ fieldInfos,
+ null,
+ properties,
+ source.getPrimaryKey(),
+ source.getHostname(),
+ source.getUsername(),
+ source.getPassword(),
+ source.getDatabase(),
+ source.getSchemaName(),
+ source.getTableName(),
+ source.getPort(),
+ scanStartupMode);
+ }
+}
\ No newline at end of file
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/PostgreSqlProvider.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/PostgreSqlProvider.java
new file mode 100644
index 0000000000..128c30018e
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/PostgreSqlProvider.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sort.node.extract;
+
+import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.pojo.sort.node.ExtractNodeProvider;
+import org.apache.inlong.manager.pojo.source.postgresql.PostgreSQLSource;
+import org.apache.inlong.manager.pojo.stream.StreamNode;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.node.ExtractNode;
+import org.apache.inlong.sort.protocol.node.extract.PostgresExtractNode;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The Provider for creating PostgreSql extract nodes.
+ */
+public class PostgreSqlProvider implements ExtractNodeProvider {
+
+ @Override
+ public Boolean accept(String sourceType) {
+ return SourceType.POSTGRESQL.equals(sourceType);
+ }
+
+ @Override
+ public ExtractNode createNode(StreamNode streamNodeInfo) {
+ PostgreSQLSource postgreSQLSource = (PostgreSQLSource) streamNodeInfo;
+ List<FieldInfo> fieldInfos =
parseFieldInfos(postgreSQLSource.getFieldList(),
postgreSQLSource.getSourceName());
+ Map<String, String> properties =
parseProperties(postgreSQLSource.getProperties());
+
+ return new PostgresExtractNode(postgreSQLSource.getSourceName(),
+ postgreSQLSource.getSourceName(),
+ fieldInfos,
+ null,
+ properties,
+ postgreSQLSource.getPrimaryKey(),
+ postgreSQLSource.getTableNameList(),
+ postgreSQLSource.getHostname(),
+ postgreSQLSource.getUsername(),
+ postgreSQLSource.getPassword(),
+ postgreSQLSource.getDatabase(),
+ postgreSQLSource.getSchema(),
+ postgreSQLSource.getPort(),
+ postgreSQLSource.getDecodingPluginName(),
+ postgreSQLSource.getServerTimeZone(),
+ postgreSQLSource.getScanStartupMode());
+ }
+}
\ No newline at end of file
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/PulsarProvider.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/PulsarProvider.java
new file mode 100644
index 0000000000..14cde851b7
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/PulsarProvider.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sort.node.extract;
+
+import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.pojo.sort.node.ExtractNodeProvider;
+import org.apache.inlong.manager.pojo.source.pulsar.PulsarSource;
+import org.apache.inlong.manager.pojo.stream.StreamNode;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.enums.PulsarScanStartupMode;
+import org.apache.inlong.sort.protocol.node.ExtractNode;
+import org.apache.inlong.sort.protocol.node.extract.PulsarExtractNode;
+import org.apache.inlong.sort.protocol.node.format.Format;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The Provider for creating Pulsar extract nodes.
+ */
+public class PulsarProvider implements ExtractNodeProvider {
+
+ @Override
+ public Boolean accept(String sourceType) {
+ return SourceType.PULSAR.equals(sourceType);
+ }
+
+ @Override
+ public ExtractNode createNode(StreamNode streamNodeInfo) {
+ PulsarSource pulsarSource = (PulsarSource) streamNodeInfo;
+ List<FieldInfo> fieldInfos =
parseFieldInfos(pulsarSource.getFieldList(), pulsarSource.getSourceName());
+ Map<String, String> properties =
parseProperties(pulsarSource.getProperties());
+
+ String fullTopicName =
+ pulsarSource.getPulsarTenant() + "/" +
pulsarSource.getNamespace() + "/" + pulsarSource.getTopic();
+
+ Format format = parsingFormat(pulsarSource.getSerializationType(),
+ pulsarSource.isWrapWithInlongMsg(),
+ pulsarSource.getDataSeparator(),
+ pulsarSource.isIgnoreParseError());
+
+ PulsarScanStartupMode startupMode =
PulsarScanStartupMode.forName(pulsarSource.getScanStartupMode());
+ final String primaryKey = pulsarSource.getPrimaryKey();
+ final String serviceUrl = pulsarSource.getServiceUrl();
+ final String adminUrl = pulsarSource.getAdminUrl();
+ final String scanStartupSubStartOffset =
+ StringUtils.isNotBlank(pulsarSource.getSubscription()) ?
PulsarScanStartupMode.EARLIEST.getValue()
+ : null;
+
+ return new PulsarExtractNode(pulsarSource.getSourceName(),
+ pulsarSource.getSourceName(),
+ fieldInfos,
+ null,
+ properties,
+ fullTopicName,
+ adminUrl,
+ serviceUrl,
+ format,
+ startupMode.getValue(),
+ primaryKey,
+ pulsarSource.getSubscription(),
+ scanStartupSubStartOffset);
+ }
+}
\ No newline at end of file
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/RedisProvider.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/RedisProvider.java
new file mode 100644
index 0000000000..18f435e89f
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/RedisProvider.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sort.node.extract;
+
+import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.pojo.sort.node.ExtractNodeProvider;
+import org.apache.inlong.manager.pojo.source.redis.RedisLookupOptions;
+import org.apache.inlong.manager.pojo.source.redis.RedisSource;
+import org.apache.inlong.manager.pojo.stream.StreamNode;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.LookupOptions;
+import org.apache.inlong.sort.protocol.enums.RedisCommand;
+import org.apache.inlong.sort.protocol.enums.RedisMode;
+import org.apache.inlong.sort.protocol.node.ExtractNode;
+import org.apache.inlong.sort.protocol.node.extract.RedisExtractNode;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The Provider for creating Redis extract nodes.
+ */
+public class RedisProvider implements ExtractNodeProvider {
+
+ @Override
+ public Boolean accept(String sourceType) {
+ return SourceType.REDIS.equals(sourceType);
+ }
+
+ @Override
+ public ExtractNode createNode(StreamNode streamNodeInfo) {
+ RedisSource source = (RedisSource) streamNodeInfo;
+ List<FieldInfo> fieldInfos = parseFieldInfos(source.getFieldList(),
source.getSourceName());
+ Map<String, String> properties =
parseProperties(source.getProperties());
+
+ RedisMode redisMode = RedisMode.forName(source.getRedisMode());
+ switch (redisMode) {
+ case STANDALONE:
+ return new RedisExtractNode(
+ source.getSourceName(),
+ source.getSourceName(),
+ fieldInfos,
+ null,
+ properties,
+ source.getPrimaryKey(),
+ RedisCommand.forName(source.getCommand()),
+ source.getHost(),
+ source.getPort(),
+ source.getPassword(),
+ source.getAdditionalKey(),
+ source.getDatabase(),
+ source.getTimeout(),
+ source.getSoTimeout(),
+ source.getMaxTotal(),
+ source.getMaxIdle(),
+ source.getMinIdle(),
+ parseLookupOptions(source.getLookupOptions()));
+ case SENTINEL:
+ return new RedisExtractNode(
+ source.getSourceName(),
+ source.getSourceName(),
+ fieldInfos,
+ null,
+ properties,
+ source.getPrimaryKey(),
+ RedisCommand.forName(source.getCommand()),
+ source.getMasterName(),
+ source.getSentinelsInfo(),
+ source.getPassword(),
+ source.getAdditionalKey(),
+ source.getDatabase(),
+ source.getTimeout(),
+ source.getSoTimeout(),
+ source.getMaxTotal(),
+ source.getMaxIdle(),
+ source.getMinIdle(),
+ parseLookupOptions(source.getLookupOptions()));
+ case CLUSTER:
+ return new RedisExtractNode(
+ source.getSourceName(),
+ source.getSourceName(),
+ fieldInfos,
+ null,
+ properties,
+ source.getPrimaryKey(),
+ RedisCommand.forName(source.getCommand()),
+ source.getClusterNodes(),
+ source.getPassword(),
+ source.getAdditionalKey(),
+ source.getDatabase(),
+ source.getTimeout(),
+ source.getSoTimeout(),
+ source.getMaxTotal(),
+ source.getMaxIdle(),
+ source.getMinIdle(),
+ parseLookupOptions(source.getLookupOptions()));
+ default:
+ throw new IllegalArgumentException(String.format("Unsupported
redis-mode=%s for Inlong", redisMode));
+ }
+ }
+
+ /**
+ * Parse LookupOptions
+ *
+ * @param options RedisLookupOptions
+ * @return LookupOptions
+ */
+ private static LookupOptions parseLookupOptions(RedisLookupOptions
options) {
+ if (options == null) {
+ return null;
+ }
+ return new LookupOptions(options.getLookupCacheMaxRows(),
options.getLookupCacheTtl(),
+ options.getLookupMaxRetries(), options.getLookupAsync());
+ }
+}
\ No newline at end of file
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/SqlServerProvider.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/SqlServerProvider.java
new file mode 100644
index 0000000000..f9ca5f985a
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/SqlServerProvider.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sort.node.extract;
+
+import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.pojo.sort.node.ExtractNodeProvider;
+import org.apache.inlong.manager.pojo.source.sqlserver.SQLServerSource;
+import org.apache.inlong.manager.pojo.stream.StreamNode;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.node.ExtractNode;
+import org.apache.inlong.sort.protocol.node.extract.SqlServerExtractNode;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The Provider for creating SQLServer extract nodes.
+ */
+public class SqlServerProvider implements ExtractNodeProvider {
+
+ @Override
+ public Boolean accept(String sourceType) {
+ return SourceType.SQLSERVER.equals(sourceType);
+ }
+
+ @Override
+ public ExtractNode createNode(StreamNode streamNodeInfo) {
+ SQLServerSource source = (SQLServerSource) streamNodeInfo;
+ List<FieldInfo> fieldInfos = parseFieldInfos(source.getFieldList(),
source.getSourceName());
+ Map<String, String> properties =
parseProperties(source.getProperties());
+
+ return new SqlServerExtractNode(
+ source.getSourceName(),
+ source.getSourceName(),
+ fieldInfos,
+ null,
+ properties,
+ source.getPrimaryKey(),
+ source.getHostname(),
+ source.getPort(),
+ source.getUsername(),
+ source.getPassword(),
+ source.getDatabase(),
+ source.getSchemaName(),
+ source.getTableName(),
+ source.getServerTimezone());
+ }
+}
\ No newline at end of file
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/TubeMqProvider.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/TubeMqProvider.java
new file mode 100644
index 0000000000..2a61b385a1
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/TubeMqProvider.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sort.node.extract;
+
+import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.pojo.sort.node.ExtractNodeProvider;
+import org.apache.inlong.manager.pojo.source.tubemq.TubeMQSource;
+import org.apache.inlong.manager.pojo.stream.StreamNode;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.node.ExtractNode;
+import org.apache.inlong.sort.protocol.node.extract.TubeMQExtractNode;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The Provider for creating TubeMQ extract nodes.
+ */
+public class TubeMqProvider implements ExtractNodeProvider {
+
+ @Override
+ public Boolean accept(String sourceType) {
+ return SourceType.TUBEMQ.equals(sourceType);
+ }
+
+ @Override
+ public ExtractNode createNode(StreamNode streamNodeInfo) {
+ TubeMQSource source = (TubeMQSource) streamNodeInfo;
+ List<FieldInfo> fieldInfos = parseFieldInfos(source.getFieldList(),
source.getSourceName());
+ Map<String, String> properties =
parseProperties(source.getProperties());
+
+ return new TubeMQExtractNode(
+ source.getSourceName(),
+ source.getSourceName(),
+ fieldInfos,
+ null,
+ properties,
+ source.getMasterRpc(),
+ source.getTopic(),
+ source.getSerializationType(),
+ source.getGroupId(),
+ source.getSessionKey(),
+ source.getTid());
+ }
+}
\ No newline at end of file
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java
deleted file mode 100644
index 44ea1b4fb5..0000000000
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java
+++ /dev/null
@@ -1,585 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.pojo.sort.util;
-
-import org.apache.inlong.common.enums.DataTypeEnum;
-import org.apache.inlong.manager.common.consts.SourceType;
-import org.apache.inlong.manager.pojo.source.StreamSource;
-import org.apache.inlong.manager.pojo.source.hudi.HudiSource;
-import org.apache.inlong.manager.pojo.source.kafka.KafkaOffset;
-import org.apache.inlong.manager.pojo.source.kafka.KafkaSource;
-import org.apache.inlong.manager.pojo.source.mongodb.MongoDBSource;
-import org.apache.inlong.manager.pojo.source.mysql.MySQLBinlogSource;
-import org.apache.inlong.manager.pojo.source.oracle.OracleSource;
-import org.apache.inlong.manager.pojo.source.postgresql.PostgreSQLSource;
-import org.apache.inlong.manager.pojo.source.pulsar.PulsarSource;
-import org.apache.inlong.manager.pojo.source.redis.RedisLookupOptions;
-import org.apache.inlong.manager.pojo.source.redis.RedisSource;
-import org.apache.inlong.manager.pojo.source.sqlserver.SQLServerSource;
-import org.apache.inlong.manager.pojo.source.tubemq.TubeMQSource;
-import org.apache.inlong.manager.pojo.stream.StreamField;
-import org.apache.inlong.sort.protocol.FieldInfo;
-import org.apache.inlong.sort.protocol.LookupOptions;
-import org.apache.inlong.sort.protocol.constant.HudiConstant.CatalogType;
-import org.apache.inlong.sort.protocol.constant.OracleConstant.ScanStartUpMode;
-import org.apache.inlong.sort.protocol.enums.KafkaScanStartupMode;
-import org.apache.inlong.sort.protocol.enums.PulsarScanStartupMode;
-import org.apache.inlong.sort.protocol.enums.RedisCommand;
-import org.apache.inlong.sort.protocol.enums.RedisMode;
-import org.apache.inlong.sort.protocol.node.ExtractNode;
-import org.apache.inlong.sort.protocol.node.extract.HudiExtractNode;
-import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode;
-import org.apache.inlong.sort.protocol.node.extract.MongoExtractNode;
-import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
-import org.apache.inlong.sort.protocol.node.extract.OracleExtractNode;
-import org.apache.inlong.sort.protocol.node.extract.PostgresExtractNode;
-import org.apache.inlong.sort.protocol.node.extract.PulsarExtractNode;
-import org.apache.inlong.sort.protocol.node.extract.RedisExtractNode;
-import org.apache.inlong.sort.protocol.node.extract.SqlServerExtractNode;
-import org.apache.inlong.sort.protocol.node.extract.TubeMQExtractNode;
-import org.apache.inlong.sort.protocol.node.format.AvroFormat;
-import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat;
-import org.apache.inlong.sort.protocol.node.format.CsvFormat;
-import org.apache.inlong.sort.protocol.node.format.DebeziumJsonFormat;
-import org.apache.inlong.sort.protocol.node.format.Format;
-import org.apache.inlong.sort.protocol.node.format.InLongMsgFormat;
-import org.apache.inlong.sort.protocol.node.format.JsonFormat;
-import org.apache.inlong.sort.protocol.node.format.RawFormat;
-
-import com.google.common.base.Splitter;
-import com.google.common.collect.Lists;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.stream.Collectors;
-
-/**
- * Parse SourceInfo to ExtractNode which sort needed
- */
-@Slf4j
-public class ExtractNodeUtils {
-
- /**
- * Create extract nodes from the given sources.
- */
- public static List<ExtractNode> createExtractNodes(List<StreamSource>
sourceInfos) {
- if (CollectionUtils.isEmpty(sourceInfos)) {
- return Lists.newArrayList();
- }
- return sourceInfos.stream().map(ExtractNodeUtils::createExtractNode)
- .collect(Collectors.toList());
- }
-
- public static ExtractNode createExtractNode(StreamSource sourceInfo) {
- String sourceType = sourceInfo.getSourceType();
- switch (sourceType) {
- case SourceType.MYSQL_BINLOG:
- return createExtractNode((MySQLBinlogSource) sourceInfo);
- case SourceType.KAFKA:
- return createExtractNode((KafkaSource) sourceInfo);
- case SourceType.PULSAR:
- return createExtractNode((PulsarSource) sourceInfo);
- case SourceType.POSTGRESQL:
- return createExtractNode((PostgreSQLSource) sourceInfo);
- case SourceType.ORACLE:
- return createExtractNode((OracleSource) sourceInfo);
- case SourceType.SQLSERVER:
- return createExtractNode((SQLServerSource) sourceInfo);
- case SourceType.MONGODB:
- return createExtractNode((MongoDBSource) sourceInfo);
- case SourceType.TUBEMQ:
- return createExtractNode((TubeMQSource) sourceInfo);
- case SourceType.REDIS:
- return createExtractNode((RedisSource) sourceInfo);
- case SourceType.HUDI:
- return createExtractNode((HudiSource) sourceInfo);
- default:
- throw new IllegalArgumentException(
- String.format("Unsupported sourceType=%s to create
extractNode", sourceType));
- }
- }
-
- /**
- * Create MySql extract node
- *
- * @param binlogSource MySql binlog source info
- * @return MySql extract node info
- */
- public static MySqlExtractNode createExtractNode(MySQLBinlogSource
binlogSource) {
- final String database = binlogSource.getDatabaseWhiteList();
- final String primaryKey = binlogSource.getPrimaryKey();
- final String hostName = binlogSource.getHostname();
- final String username = binlogSource.getUser();
- final String password = binlogSource.getPassword();
- final Integer port = binlogSource.getPort();
- Integer serverId = null;
- if (binlogSource.getServerId() != null && binlogSource.getServerId() >
0) {
- serverId = binlogSource.getServerId();
- }
- String tables = binlogSource.getTableWhiteList();
- final List<String> tableNames = Splitter.on(",").splitToList(tables);
- List<FieldInfo> fieldInfos =
parseFieldInfos(binlogSource.getFieldList(), binlogSource.getSourceName());
- final String serverTimeZone = binlogSource.getServerTimezone();
-
- // TODO Needs to be configurable for those parameters
- Map<String, String> properties =
parseProperties(binlogSource.getProperties());
- if (binlogSource.isAllMigration()) {
- // Unique properties when migrate all tables in database
- properties.put("migrate-all", "true");
- }
-
- return new MySqlExtractNode(binlogSource.getSourceName(),
- binlogSource.getSourceName(),
- fieldInfos,
- null,
- properties,
- primaryKey,
- tableNames,
- hostName,
- username,
- password,
- database,
- port,
- serverId,
- true,
- serverTimeZone);
- }
-
- /**
- * Create Kafka extract node
- *
- * @param kafkaSource Kafka source info
- * @return Kafka extract node info
- */
- public static KafkaExtractNode createExtractNode(KafkaSource kafkaSource) {
- List<FieldInfo> fieldInfos =
parseFieldInfos(kafkaSource.getFieldList(), kafkaSource.getSourceName());
- String topic = kafkaSource.getTopic();
- String bootstrapServers = kafkaSource.getBootstrapServers();
-
- Format format = parsingFormat(
- kafkaSource.getSerializationType(),
- kafkaSource.isWrapWithInlongMsg(),
- kafkaSource.getDataSeparator(),
- kafkaSource.isIgnoreParseErrors());
-
- KafkaOffset kafkaOffset =
KafkaOffset.forName(kafkaSource.getAutoOffsetReset());
- KafkaScanStartupMode startupMode;
- switch (kafkaOffset) {
- case EARLIEST:
- startupMode = KafkaScanStartupMode.EARLIEST_OFFSET;
- break;
- case SPECIFIC:
- startupMode = KafkaScanStartupMode.SPECIFIC_OFFSETS;
- break;
- case TIMESTAMP_MILLIS:
- startupMode = KafkaScanStartupMode.TIMESTAMP_MILLIS;
- break;
- case LATEST:
- default:
- startupMode = KafkaScanStartupMode.LATEST_OFFSET;
- }
- final String primaryKey = kafkaSource.getPrimaryKey();
- String groupId = kafkaSource.getGroupId();
- Map<String, String> properties =
parseProperties(kafkaSource.getProperties());
- String partitionOffset = kafkaSource.getPartitionOffsets();
- String scanTimestampMillis = kafkaSource.getTimestampMillis();
- return new KafkaExtractNode(kafkaSource.getSourceName(),
- kafkaSource.getSourceName(),
- fieldInfos,
- null,
- properties,
- topic,
- bootstrapServers,
- format,
- startupMode,
- primaryKey,
- groupId,
- partitionOffset,
- scanTimestampMillis);
- }
-
- /**
- * Create Pulsar extract node
- *
- * @param pulsarSource Pulsar source info
- * @return Pulsar extract node info
- */
- public static PulsarExtractNode createExtractNode(PulsarSource
pulsarSource) {
- List<FieldInfo> fieldInfos =
parseFieldInfos(pulsarSource.getFieldList(), pulsarSource.getSourceName());
- String fullTopicName =
- pulsarSource.getPulsarTenant() + "/" +
pulsarSource.getNamespace() + "/" + pulsarSource.getTopic();
-
- Format format = parsingFormat(pulsarSource.getSerializationType(),
- pulsarSource.isWrapWithInlongMsg(),
- pulsarSource.getDataSeparator(),
- pulsarSource.isIgnoreParseError());
-
- PulsarScanStartupMode startupMode =
PulsarScanStartupMode.forName(pulsarSource.getScanStartupMode());
- final String primaryKey = pulsarSource.getPrimaryKey();
- final String serviceUrl = pulsarSource.getServiceUrl();
- final String adminUrl = pulsarSource.getAdminUrl();
- final String scanStartupSubStartOffset =
- StringUtils.isNotBlank(pulsarSource.getSubscription()) ?
PulsarScanStartupMode.EARLIEST.getValue()
- : null;
- Map<String, String> properties =
parseProperties(pulsarSource.getProperties());
-
- return new PulsarExtractNode(pulsarSource.getSourceName(),
- pulsarSource.getSourceName(),
- fieldInfos,
- null,
- properties,
- fullTopicName,
- adminUrl,
- serviceUrl,
- format,
- startupMode.getValue(),
- primaryKey,
- pulsarSource.getSubscription(),
- scanStartupSubStartOffset);
- }
-
- /**
- * Create PostgreSQL extract node
- *
- * @param postgreSQLSource PostgreSQL source info
- * @return PostgreSQL extract node info
- */
- public static PostgresExtractNode createExtractNode(PostgreSQLSource
postgreSQLSource) {
- List<FieldInfo> fieldInfos =
parseFieldInfos(postgreSQLSource.getFieldList(),
postgreSQLSource.getSourceName());
- Map<String, String> properties =
parseProperties(postgreSQLSource.getProperties());
- return new PostgresExtractNode(postgreSQLSource.getSourceName(),
- postgreSQLSource.getSourceName(),
- fieldInfos,
- null,
- properties,
- postgreSQLSource.getPrimaryKey(),
- postgreSQLSource.getTableNameList(),
- postgreSQLSource.getHostname(),
- postgreSQLSource.getUsername(),
- postgreSQLSource.getPassword(),
- postgreSQLSource.getDatabase(),
- postgreSQLSource.getSchema(),
- postgreSQLSource.getPort(),
- postgreSQLSource.getDecodingPluginName(),
- postgreSQLSource.getServerTimeZone(),
- postgreSQLSource.getScanStartupMode());
- }
-
- /**
- * Create Oracle extract node
- *
- * @param source Oracle source info
- * @return oracle extract node info
- */
- public static OracleExtractNode createExtractNode(OracleSource source) {
- List<FieldInfo> fieldInfos = parseFieldInfos(source.getFieldList(),
source.getSourceName());
- ScanStartUpMode scanStartupMode =
StringUtils.isBlank(source.getScanStartupMode())
- ? null
- : ScanStartUpMode.forName(source.getScanStartupMode());
- Map<String, String> properties =
parseProperties(source.getProperties());
- return new OracleExtractNode(
- source.getSourceName(),
- source.getSourceName(),
- fieldInfos,
- null,
- properties,
- source.getPrimaryKey(),
- source.getHostname(),
- source.getUsername(),
- source.getPassword(),
- source.getDatabase(),
- source.getSchemaName(),
- source.getTableName(),
- source.getPort(),
- scanStartupMode);
- }
-
- /**
- * Create SQLServer extract node
- *
- * @param source SQLServer source info
- * @return SQLServer extract node info
- */
- public static SqlServerExtractNode createExtractNode(SQLServerSource
source) {
- List<FieldInfo> fieldInfos = parseFieldInfos(source.getFieldList(),
source.getSourceName());
- Map<String, String> properties =
parseProperties(source.getProperties());
- return new SqlServerExtractNode(
- source.getSourceName(),
- source.getSourceName(),
- fieldInfos,
- null,
- properties,
- source.getPrimaryKey(),
- source.getHostname(),
- source.getPort(),
- source.getUsername(),
- source.getPassword(),
- source.getDatabase(),
- source.getSchemaName(),
- source.getTableName(),
- source.getServerTimezone());
- }
-
- /**
- * Create MongoDB extract node
- *
- * @param source MongoDB source info
- * @return MongoDB extract node info
- */
- public static MongoExtractNode createExtractNode(MongoDBSource source) {
- List<FieldInfo> fieldInfos = parseFieldInfos(source.getFieldList(),
source.getSourceName());
- Map<String, String> properties =
parseProperties(source.getProperties());
- return new MongoExtractNode(
- source.getSourceName(),
- source.getSourceName(),
- fieldInfos,
- null,
- properties,
- source.getCollection(),
- source.getHosts(),
- source.getUsername(),
- source.getPassword(),
- source.getDatabase());
- }
-
- /**
- * Create TubeMQ extract node
- *
- * @param source TubeMQ source info
- * @return TubeMQ extract node info
- */
- public static TubeMQExtractNode createExtractNode(TubeMQSource source) {
- List<FieldInfo> fieldInfos = parseFieldInfos(source.getFieldList(),
source.getSourceName());
- Map<String, String> properties =
parseProperties(source.getProperties());
- return new TubeMQExtractNode(
- source.getSourceName(),
- source.getSourceName(),
- fieldInfos,
- null,
- properties,
- source.getMasterRpc(),
- source.getTopic(),
- source.getSerializationType(),
- source.getGroupId(),
- source.getSessionKey(),
- source.getTid());
- }
-
- /**
- * Create Redis extract node
- *
- * @param source redis source info
- * @return redis extract source info
- */
- public static RedisExtractNode createExtractNode(RedisSource source) {
- List<FieldInfo> fieldInfos = parseFieldInfos(source.getFieldList(),
source.getSourceName());
- Map<String, String> properties =
parseProperties(source.getProperties());
- RedisMode redisMode = RedisMode.forName(source.getRedisMode());
- switch (redisMode) {
- case STANDALONE:
- return new RedisExtractNode(
- source.getSourceName(),
- source.getSourceName(),
- fieldInfos,
- null,
- properties,
- source.getPrimaryKey(),
- RedisCommand.forName(source.getCommand()),
- source.getHost(),
- source.getPort(),
- source.getPassword(),
- source.getAdditionalKey(),
- source.getDatabase(),
- source.getTimeout(),
- source.getSoTimeout(),
- source.getMaxTotal(),
- source.getMaxIdle(),
- source.getMinIdle(),
- parseLookupOptions(source.getLookupOptions()));
- case SENTINEL:
- return new RedisExtractNode(
- source.getSourceName(),
- source.getSourceName(),
- fieldInfos,
- null,
- properties,
- source.getPrimaryKey(),
- RedisCommand.forName(source.getCommand()),
- source.getMasterName(),
- source.getSentinelsInfo(),
- source.getPassword(),
- source.getAdditionalKey(),
- source.getDatabase(),
- source.getTimeout(),
- source.getSoTimeout(),
- source.getMaxTotal(),
- source.getMaxIdle(),
- source.getMinIdle(),
- parseLookupOptions(source.getLookupOptions()));
- case CLUSTER:
- return new RedisExtractNode(
- source.getSourceName(),
- source.getSourceName(),
- fieldInfos,
- null,
- properties,
- source.getPrimaryKey(),
- RedisCommand.forName(source.getCommand()),
- source.getClusterNodes(),
- source.getPassword(),
- source.getAdditionalKey(),
- source.getDatabase(),
- source.getTimeout(),
- source.getSoTimeout(),
- source.getMaxTotal(),
- source.getMaxIdle(),
- source.getMinIdle(),
- parseLookupOptions(source.getLookupOptions()));
- default:
- throw new IllegalArgumentException(String.format("Unsupported
redis-mode=%s for Inlong", redisMode));
- }
-
- }
-
- /**
- * Create Hudi extract node
- *
- * @param source hudi source info
- * @return hudi extract source info
- */
- public static HudiExtractNode createExtractNode(HudiSource source) {
- List<FieldInfo> fieldInfos = parseFieldInfos(source.getFieldList(),
source.getSourceName());
- Map<String, String> properties =
parseProperties(source.getProperties());
-
- return new HudiExtractNode(
- source.getSourceName(),
- source.getSourceName(),
- fieldInfos,
- null,
- source.getCatalogUri(),
- source.getWarehouse(),
- source.getDbName(),
- source.getTableName(),
- CatalogType.HIVE,
- source.getCheckIntervalInMinus(),
- source.isReadStreamingSkipCompaction(),
- source.getReadStartCommit(),
- properties,
- source.getExtList());
- }
-
- /**
- * Parse format
- *
- * @param serializationType data serialization, support: csv, json, canal,
avro, etc
- * @param wrapWithInlongMsg whether wrap content with {@link
InLongMsgFormat}
- * @param separatorStr the separator of data content
- * @param ignoreParseErrors whether ignore deserialization error data
- * @return the format for serialized content
- */
- private static Format parsingFormat(
- String serializationType,
- boolean wrapWithInlongMsg,
- String separatorStr,
- boolean ignoreParseErrors) {
- Format format;
- DataTypeEnum dataType = DataTypeEnum.forType(serializationType);
- switch (dataType) {
- case CSV:
- if (StringUtils.isNumeric(separatorStr)) {
- char dataSeparator = (char) Integer.parseInt(separatorStr);
- separatorStr = Character.toString(dataSeparator);
- }
- CsvFormat csvFormat = new CsvFormat(separatorStr);
- csvFormat.setIgnoreParseErrors(ignoreParseErrors);
- format = csvFormat;
- break;
- case AVRO:
- format = new AvroFormat();
- break;
- case JSON:
- JsonFormat jsonFormat = new JsonFormat();
- jsonFormat.setIgnoreParseErrors(ignoreParseErrors);
- format = jsonFormat;
- break;
- case CANAL:
- format = new CanalJsonFormat();
- break;
- case DEBEZIUM_JSON:
- DebeziumJsonFormat debeziumJsonFormat = new
DebeziumJsonFormat();
- debeziumJsonFormat.setIgnoreParseErrors(ignoreParseErrors);
- format = debeziumJsonFormat;
- break;
- case RAW:
- format = new RawFormat();
- break;
- default:
- throw new IllegalArgumentException(String.format("Unsupported
dataType=%s", dataType));
- }
- if (wrapWithInlongMsg) {
- Format innerFormat = format;
- format = new InLongMsgFormat(innerFormat, false);
- }
- return format;
- }
-
- /**
- * Parse FieldInfos
- *
- * @param streamFields The stream fields
- * @param nodeId The node id
- * @return FieldInfo list
- */
- private static List<FieldInfo> parseFieldInfos(List<StreamField>
streamFields, String nodeId) {
- // Filter constant fields
- return streamFields.stream().filter(s -> s.getFieldValue() == null)
- .map(streamFieldInfo ->
FieldInfoUtils.parseStreamFieldInfo(streamFieldInfo, nodeId))
- .collect(Collectors.toList());
- }
-
- /**
- * Parse properties
- *
- * @param properties The properties with string key and object value
- * @return The properties with string key and string value
- */
- private static Map<String, String> parseProperties(Map<String, Object>
properties) {
- return properties.entrySet().stream()
- .filter(v -> Objects.nonNull(v.getValue()))
- .collect(Collectors.toMap(Map.Entry::getKey, e ->
e.getValue().toString()));
- }
-
- /**
- * Parse LookupOptions
- *
- * @param options
- * @return LookupOptions
- */
- private static LookupOptions parseLookupOptions(RedisLookupOptions
options) {
- if (options == null) {
- return null;
- }
- return new LookupOptions(options.getLookupCacheMaxRows(),
options.getLookupCacheTtl(),
- options.getLookupMaxRetries(), options.getLookupAsync());
- }
-
-}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
index 86a33f53d8..d23f08ddf3 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
@@ -21,7 +21,7 @@ import
org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.sink.StreamSink;
-import org.apache.inlong.manager.pojo.sort.util.ExtractNodeUtils;
+import org.apache.inlong.manager.pojo.sort.node.NodeProviderFactory;
import org.apache.inlong.manager.pojo.sort.util.LoadNodeUtils;
import org.apache.inlong.manager.pojo.sort.util.NodeRelationUtils;
import org.apache.inlong.manager.pojo.sort.util.TransformNodeUtils;
@@ -211,7 +211,7 @@ public class DefaultSortConfigOperator implements
SortConfigOperator {
private List<Node> createNodes(List<StreamSource> sources,
List<TransformResponse> transformResponses,
List<StreamSink> sinks, Map<String, StreamField> constantFieldMap)
{
List<Node> nodes = new ArrayList<>();
- nodes.addAll(ExtractNodeUtils.createExtractNodes(sources));
+ nodes.addAll(NodeProviderFactory.createExtractNodes(sources));
nodes.addAll(TransformNodeUtils.createTransformNodes(transformResponses,
constantFieldMap));
nodes.addAll(LoadNodeUtils.createLoadNodes(sinks, constantFieldMap));
return nodes;