This is an automated email from the ASF dual-hosted git repository.
pacinogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 396a6a5a60 [INLONG-8202][Manager] Optimize Provider code to extract
public parseFormat class. (#8203)
396a6a5a60 is described below
commit 396a6a5a601972fbff3a12f52dc2b0286dd1c6a9
Author: chestnufang <[email protected]>
AuthorDate: Fri Jun 9 17:32:00 2023 +0800
[INLONG-8202][Manager] Optimize Provider code to extract public parseFormat
class. (#8203)
---
.../pojo/sort/node/base/LoadNodeProvider.java | 29 +++++++
.../pojo/sort/node/provider/DorisProvider.java | 22 +----
.../sort/node/provider/ElasticsearchProvider.java | 25 +-----
.../pojo/sort/node/provider/KafkaProvider.java | 96 +++++++++++++---------
.../pojo/sort/node/provider/StarRocksProvider.java | 23 +-----
5 files changed, 95 insertions(+), 100 deletions(-)
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/LoadNodeProvider.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/LoadNodeProvider.java
index 2302c15943..2dd27f3416 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/LoadNodeProvider.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/LoadNodeProvider.java
@@ -17,6 +17,7 @@
package org.apache.inlong.manager.pojo.sort.node.base;
+import org.apache.inlong.common.enums.DataTypeEnum;
import org.apache.inlong.manager.common.enums.FieldType;
import org.apache.inlong.manager.pojo.sink.SinkField;
import org.apache.inlong.manager.pojo.sort.util.FieldInfoUtils;
@@ -25,6 +26,9 @@ import org.apache.inlong.manager.pojo.stream.StreamNode;
import org.apache.inlong.sort.formats.common.StringTypeInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.node.LoadNode;
+import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat;
+import org.apache.inlong.sort.protocol.node.format.DebeziumJsonFormat;
+import org.apache.inlong.sort.protocol.node.format.Format;
import org.apache.inlong.sort.protocol.transformation.ConstantParam;
import org.apache.inlong.sort.protocol.transformation.FieldRelation;
import org.apache.inlong.sort.protocol.transformation.FunctionParam;
@@ -97,4 +101,29 @@ public interface LoadNodeProvider extends NodeProvider {
return new FieldRelation(inputField, outputField);
}).collect(Collectors.toList());
}
+
+ /**
+ * Parse format
+ *
+ * @param multipleEnable whether to enable multi-write
+ * @param multipleFormat data serialization format
+ * @return the format for serialized content
+ */
+ default Format parsingSinkMultipleFormat(Boolean multipleEnable, String
multipleFormat) {
+ Format format = null;
+ if (Boolean.TRUE.equals(multipleEnable) &&
StringUtils.isNotBlank(multipleFormat)) {
+ DataTypeEnum dataType = DataTypeEnum.forType(multipleFormat);
+ switch (dataType) {
+ case CANAL:
+ format = new CanalJsonFormat();
+ break;
+ case DEBEZIUM_JSON:
+ format = new DebeziumJsonFormat();
+ break;
+ default:
+ throw new
IllegalArgumentException(String.format("Unsupported dataType=%s", dataType));
+ }
+ }
+ return format;
+ }
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/DorisProvider.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/DorisProvider.java
index 2586895f8a..e4e71333c9 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/DorisProvider.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/DorisProvider.java
@@ -17,7 +17,6 @@
package org.apache.inlong.manager.pojo.sort.node.provider;
-import org.apache.inlong.common.enums.DataTypeEnum;
import org.apache.inlong.manager.common.consts.SinkType;
import org.apache.inlong.manager.pojo.sink.doris.DorisSink;
import org.apache.inlong.manager.pojo.sort.node.base.LoadNodeProvider;
@@ -25,14 +24,10 @@ import org.apache.inlong.manager.pojo.stream.StreamField;
import org.apache.inlong.manager.pojo.stream.StreamNode;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.node.LoadNode;
-import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat;
-import org.apache.inlong.sort.protocol.node.format.DebeziumJsonFormat;
import org.apache.inlong.sort.protocol.node.format.Format;
import org.apache.inlong.sort.protocol.node.load.DorisLoadNode;
import org.apache.inlong.sort.protocol.transformation.FieldRelation;
-import org.apache.commons.lang3.StringUtils;
-
import java.util.List;
import java.util.Map;
@@ -52,21 +47,8 @@ public class DorisProvider implements LoadNodeProvider {
Map<String, String> properties =
parseProperties(dorisSink.getProperties());
List<FieldInfo> fieldInfos =
parseSinkFieldInfos(dorisSink.getSinkFieldList(), dorisSink.getSinkName());
List<FieldRelation> fieldRelations =
parseSinkFields(dorisSink.getSinkFieldList(), constantFieldMap);
- Format format = null;
- if (dorisSink.getSinkMultipleEnable() != null &&
dorisSink.getSinkMultipleEnable() && StringUtils.isNotBlank(
- dorisSink.getSinkMultipleFormat())) {
- DataTypeEnum dataType =
DataTypeEnum.forType(dorisSink.getSinkMultipleFormat());
- switch (dataType) {
- case CANAL:
- format = new CanalJsonFormat();
- break;
- case DEBEZIUM_JSON:
- format = new DebeziumJsonFormat();
- break;
- default:
- throw new
IllegalArgumentException(String.format("Unsupported dataType=%s for doris",
dataType));
- }
- }
+ Format format =
parsingSinkMultipleFormat(dorisSink.getSinkMultipleEnable(),
dorisSink.getSinkMultipleFormat());
+
return new DorisLoadNode(
dorisSink.getSinkName(),
dorisSink.getSinkName(),
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/ElasticsearchProvider.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/ElasticsearchProvider.java
index 5072f9ac46..80861cbc59 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/ElasticsearchProvider.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/ElasticsearchProvider.java
@@ -17,7 +17,6 @@
package org.apache.inlong.manager.pojo.sort.node.provider;
-import org.apache.inlong.common.enums.DataTypeEnum;
import org.apache.inlong.manager.common.consts.SinkType;
import org.apache.inlong.manager.pojo.sink.SinkField;
import org.apache.inlong.manager.pojo.sink.es.ElasticsearchSink;
@@ -26,14 +25,10 @@ import org.apache.inlong.manager.pojo.stream.StreamField;
import org.apache.inlong.manager.pojo.stream.StreamNode;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.node.LoadNode;
-import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat;
-import org.apache.inlong.sort.protocol.node.format.DebeziumJsonFormat;
import org.apache.inlong.sort.protocol.node.format.Format;
import org.apache.inlong.sort.protocol.node.load.ElasticsearchLoadNode;
import org.apache.inlong.sort.protocol.transformation.FieldRelation;
-import org.apache.commons.lang3.StringUtils;
-
import java.util.List;
import java.util.Map;
@@ -54,23 +49,9 @@ public class ElasticsearchProvider implements
LoadNodeProvider {
List<SinkField> sinkFieldList = elasticsearchSink.getSinkFieldList();
List<FieldInfo> fieldInfos = parseSinkFieldInfos(sinkFieldList,
elasticsearchSink.getSinkName());
List<FieldRelation> fieldRelations = parseSinkFields(sinkFieldList,
constantFieldMap);
- Format format = null;
- if (elasticsearchSink.getSinkMultipleEnable() != null &&
elasticsearchSink.getSinkMultipleEnable()
- && StringUtils.isNotBlank(
- elasticsearchSink.getSinkMultipleFormat())) {
- DataTypeEnum dataType =
DataTypeEnum.forType(elasticsearchSink.getSinkMultipleFormat());
- switch (dataType) {
- case CANAL:
- format = new CanalJsonFormat();
- break;
- case DEBEZIUM_JSON:
- format = new DebeziumJsonFormat();
- break;
- default:
- throw new IllegalArgumentException(
- String.format("Unsupported dataType=%s for
elasticsearch", dataType));
- }
- }
+ Format format =
parsingSinkMultipleFormat(elasticsearchSink.getSinkMultipleEnable(),
+ elasticsearchSink.getSinkMultipleFormat());
+
return new ElasticsearchLoadNode(
elasticsearchSink.getSinkName(),
elasticsearchSink.getSinkName(),
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/KafkaProvider.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/KafkaProvider.java
index adb1d911e1..bb355b30c6 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/KafkaProvider.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/KafkaProvider.java
@@ -63,31 +63,16 @@ public class KafkaProvider implements ExtractNodeProvider,
LoadNodeProvider {
List<FieldInfo> fieldInfos =
parseStreamFieldInfos(kafkaSource.getFieldList(), kafkaSource.getSourceName());
Map<String, String> properties =
parseProperties(kafkaSource.getProperties());
- String topic = kafkaSource.getTopic();
- String bootstrapServers = kafkaSource.getBootstrapServers();
-
Format format = parsingFormat(
kafkaSource.getSerializationType(),
kafkaSource.isWrapWithInlongMsg(),
kafkaSource.getDataSeparator(),
kafkaSource.isIgnoreParseErrors());
- KafkaOffset kafkaOffset =
KafkaOffset.forName(kafkaSource.getAutoOffsetReset());
- KafkaScanStartupMode startupMode;
- switch (kafkaOffset) {
- case EARLIEST:
- startupMode = KafkaScanStartupMode.EARLIEST_OFFSET;
- break;
- case SPECIFIC:
- startupMode = KafkaScanStartupMode.SPECIFIC_OFFSETS;
- break;
- case TIMESTAMP_MILLIS:
- startupMode = KafkaScanStartupMode.TIMESTAMP_MILLIS;
- break;
- case LATEST:
- default:
- startupMode = KafkaScanStartupMode.LATEST_OFFSET;
- }
+ KafkaScanStartupMode startupMode =
parseStartupMode(kafkaSource.getAutoOffsetReset());
+ String topic = kafkaSource.getTopic();
+ String bootstrapServers = kafkaSource.getBootstrapServers();
+
final String primaryKey = kafkaSource.getPrimaryKey();
String groupId = kafkaSource.getGroupId();
String partitionOffset = kafkaSource.getPartitionOffsets();
@@ -113,11 +98,61 @@ public class KafkaProvider implements ExtractNodeProvider,
LoadNodeProvider {
Map<String, String> properties =
parseProperties(kafkaSink.getProperties());
List<FieldInfo> fieldInfos =
parseSinkFieldInfos(kafkaSink.getSinkFieldList(), kafkaSink.getSinkName());
List<FieldRelation> fieldRelations =
parseSinkFields(kafkaSink.getSinkFieldList(), constantFieldMap);
- Integer sinkParallelism = null;
- if (StringUtils.isNotEmpty(kafkaSink.getPartitionNum())) {
- sinkParallelism = Integer.parseInt(kafkaSink.getPartitionNum());
+
+ String partitionNum = kafkaSink.getPartitionNum();
+ Integer sinkParallelism = StringUtils.isNotBlank(partitionNum) ?
Integer.parseInt(partitionNum) : null;
+ Format format = parseFormat(kafkaSink.getSerializationType());
+
+ return new KafkaLoadNode(
+ kafkaSink.getSinkName(),
+ kafkaSink.getSinkName(),
+ fieldInfos,
+ fieldRelations,
+ Lists.newArrayList(),
+ null,
+ kafkaSink.getTopicName(),
+ kafkaSink.getBootstrapServers(),
+ format,
+ sinkParallelism,
+ properties,
+ kafkaSink.getPrimaryKey());
+ }
+
+ /**
+ * parse Startup Mode
+ *
+ * @param autoOffsetReset The strategy of auto offset reset, including
earliest, specific, latest (the
+ * default), none
+ * @return kafka scan startup mode
+ */
+ private KafkaScanStartupMode parseStartupMode(String autoOffsetReset) {
+ KafkaOffset kafkaOffset = KafkaOffset.forName(autoOffsetReset);
+ KafkaScanStartupMode startupMode;
+ switch (kafkaOffset) {
+ case EARLIEST:
+ startupMode = KafkaScanStartupMode.EARLIEST_OFFSET;
+ break;
+ case SPECIFIC:
+ startupMode = KafkaScanStartupMode.SPECIFIC_OFFSETS;
+ break;
+ case TIMESTAMP_MILLIS:
+ startupMode = KafkaScanStartupMode.TIMESTAMP_MILLIS;
+ break;
+ case LATEST:
+ default:
+ startupMode = KafkaScanStartupMode.LATEST_OFFSET;
}
- DataTypeEnum dataType =
DataTypeEnum.forType(kafkaSink.getSerializationType());
+ return startupMode;
+ }
+
+ /**
+ * parse Format
+ *
+ * @param serializationType data serialization, support: json, canal, avro
+ * @return the format for serialized content
+ */
+ private Format parseFormat(String serializationType) {
+ DataTypeEnum dataType = DataTypeEnum.forType(serializationType);
Format format;
switch (dataType) {
case CSV:
@@ -141,19 +176,6 @@ public class KafkaProvider implements ExtractNodeProvider,
LoadNodeProvider {
default:
throw new IllegalArgumentException(String.format("Unsupported
dataType=%s for Kafka", dataType));
}
-
- return new KafkaLoadNode(
- kafkaSink.getSinkName(),
- kafkaSink.getSinkName(),
- fieldInfos,
- fieldRelations,
- Lists.newArrayList(),
- null,
- kafkaSink.getTopicName(),
- kafkaSink.getBootstrapServers(),
- format,
- sinkParallelism,
- properties,
- kafkaSink.getPrimaryKey());
+ return format;
}
}
\ No newline at end of file
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/StarRocksProvider.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/StarRocksProvider.java
index 588671d370..acae2efeb3 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/StarRocksProvider.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/StarRocksProvider.java
@@ -17,7 +17,6 @@
package org.apache.inlong.manager.pojo.sort.node.provider;
-import org.apache.inlong.common.enums.DataTypeEnum;
import org.apache.inlong.manager.common.consts.SinkType;
import org.apache.inlong.manager.pojo.sink.starrocks.StarRocksSink;
import org.apache.inlong.manager.pojo.sort.node.base.LoadNodeProvider;
@@ -25,14 +24,10 @@ import org.apache.inlong.manager.pojo.stream.StreamField;
import org.apache.inlong.manager.pojo.stream.StreamNode;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.node.LoadNode;
-import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat;
-import org.apache.inlong.sort.protocol.node.format.DebeziumJsonFormat;
import org.apache.inlong.sort.protocol.node.format.Format;
import org.apache.inlong.sort.protocol.node.load.StarRocksLoadNode;
import org.apache.inlong.sort.protocol.transformation.FieldRelation;
-import org.apache.commons.lang3.StringUtils;
-
import java.util.List;
import java.util.Map;
@@ -52,23 +47,9 @@ public class StarRocksProvider implements LoadNodeProvider {
Map<String, String> properties =
parseProperties(starRocksSink.getProperties());
List<FieldInfo> fieldInfos =
parseSinkFieldInfos(starRocksSink.getSinkFieldList(),
starRocksSink.getSinkName());
List<FieldRelation> fieldRelations =
parseSinkFields(starRocksSink.getSinkFieldList(), constantFieldMap);
+ Format format =
parsingSinkMultipleFormat(starRocksSink.getSinkMultipleEnable(),
+ starRocksSink.getSinkMultipleFormat());
- Format format = null;
- if (Boolean.TRUE.equals(starRocksSink.getSinkMultipleEnable())
- &&
StringUtils.isNotBlank(starRocksSink.getSinkMultipleFormat())) {
- DataTypeEnum dataType =
DataTypeEnum.forType(starRocksSink.getSinkMultipleFormat());
- switch (dataType) {
- case CANAL:
- format = new CanalJsonFormat();
- break;
- case DEBEZIUM_JSON:
- format = new DebeziumJsonFormat();
- break;
- default:
- throw new IllegalArgumentException(
- String.format("Unsupported dataType=%s for
StarRocks", dataType));
- }
- }
return new StarRocksLoadNode(
starRocksSink.getSinkName(),
starRocksSink.getSinkName(),