This is an automated email from the ASF dual-hosted git repository.
vernedeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 0064f3fc5b [INLONG-10655][Sort] Kafka and Pulsar Sink support parse
stream separator (#10671)
0064f3fc5b is described below
commit 0064f3fc5bbaaba166330ca4d12abe372e7fe7d9
Author: vernedeng <[email protected]>
AuthorDate: Thu Jul 18 21:57:48 2024 +0800
[INLONG-10655][Sort] Kafka and Pulsar Sink support parse stream separator
(#10671)
---
.../metrics/SortConfigMetricReporter.java | 54 ++++++++++++++++------
.../sort/standalone/sink/kafka/KafkaIdConfig.java | 8 ++++
.../standalone/sink/pulsar/PulsarIdConfig.java | 9 +++-
3 files changed, 57 insertions(+), 14 deletions(-)
diff --git
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/SortConfigMetricReporter.java
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/SortConfigMetricReporter.java
index 001759d8ea..da730cef84 100644
---
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/SortConfigMetricReporter.java
+++
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/SortConfigMetricReporter.java
@@ -130,30 +130,58 @@ public class SortConfigMetricReporter {
Collection<String> intersection =
CollectionUtils.intersection(fromTaskConfig.keySet(),
fromSortTaskConfig.keySet());
List<IdConfig> diff = intersection.stream()
- .filter(k ->
!fromTaskConfig.get(k).equals(fromSortTaskConfig.get(k)))
+ .filter(k -> {
+ IdConfig fromTask = fromTaskConfig.get(k);
+ IdConfig fromSortTask = fromSortTaskConfig.get(k);
+ if (fromTask.equals(fromSortTask)) {
+ return false;
+ }
+ log.warn("find different id config, fromTaskConfig={},
fromSortTaskConfig={}", fromTask,
+ fromSortTask);
+ return true;
+ })
.map(fromSortTaskConfig::get)
.collect(Collectors.toList());
+
// report diff
diff.forEach(idConfig -> {
listeners.forEach(listener ->
listener.reportClusterDiff(sortClusterName, sortTaskName,
idConfig.getInlongGroupId(),
idConfig.getInlongStreamId()));
});
+ log.warn("different id config size = {}", diff.size());
// report miss in sort cluster config
- fromTaskConfig.forEach((k, v) -> {
- if (!intersection.contains(k)) {
- listeners.forEach(listener ->
listener.reportMissInSortClusterConfig(sortClusterName, sortTaskName,
- v.getInlongGroupId(), v.getInlongStreamId()));
- }
- });
+ List<String> missInSortClusterConfig =
fromTaskConfig.entrySet().stream()
+ .filter(entry -> {
+ String k = entry.getKey();
+ IdConfig v = entry.getValue();
+ if (!intersection.contains(k)) {
+ listeners.forEach(listener ->
listener.reportMissInSortClusterConfig(sortClusterName,
+ sortTaskName, v.getInlongGroupId(),
v.getInlongStreamId()));
+ return true;
+ }
+ return false;
+ })
+ .map(Map.Entry::getKey)
+ .collect(Collectors.toList());
// report miss in sort config
- fromSortTaskConfig.forEach((k, v) -> {
- if (!intersection.contains(k)) {
- listeners.forEach(listener ->
listener.reportMissInSortConfig(sortClusterName, sortTaskName,
- v.getInlongGroupId(), v.getInlongStreamId()));
- }
- });
+ List<String> missInSortConfig = fromSortTaskConfig.entrySet().stream()
+ .filter(entry -> {
+ String k = entry.getKey();
+ IdConfig v = entry.getValue();
+ if (!intersection.contains(k)) {
+ listeners.forEach(listener ->
listener.reportMissInSortConfig(sortClusterName,
+ sortTaskName, v.getInlongGroupId(),
v.getInlongStreamId()));
+ return true;
+ }
+ return false;
+ })
+ .map(Map.Entry::getKey)
+ .collect(Collectors.toList());
+
+ log.warn("report cluster diff, intersection={},
missInSortClusterConfig={}, missInSortConfig={}",
+ intersection, missInSortClusterConfig, missInSortConfig);
}
public static void reportSourceDiff(
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaIdConfig.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaIdConfig.java
index ecbd8ad6f3..a1c01bdb73 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaIdConfig.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaIdConfig.java
@@ -19,6 +19,8 @@ package org.apache.inlong.sort.standalone.sink.kafka;
import org.apache.inlong.common.enums.DataTypeEnum;
import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.CsvConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.DataTypeConfig;
import org.apache.inlong.common.pojo.sort.dataflow.sink.KafkaSinkConfig;
import org.apache.inlong.sort.standalone.config.pojo.IdConfig;
import org.apache.inlong.sort.standalone.config.pojo.InlongId;
@@ -60,6 +62,11 @@ public class KafkaIdConfig extends IdConfig {
public static KafkaIdConfig create(DataFlowConfig dataFlowConfig) {
KafkaSinkConfig sinkConfig = (KafkaSinkConfig)
dataFlowConfig.getSinkConfig();
+ DataTypeConfig dataTypeConfig =
dataFlowConfig.getSourceConfig().getDataTypeConfig();
+ String separator = DEFAULT_SEPARATOR;
+ if (dataTypeConfig instanceof CsvConfig) {
+ separator = String.valueOf(((CsvConfig)
dataTypeConfig).getDelimiter());
+ }
return KafkaIdConfig.builder()
.inlongGroupId(dataFlowConfig.getInlongGroupId())
@@ -67,6 +74,7 @@ public class KafkaIdConfig extends IdConfig {
.uid(InlongId.generateUid(dataFlowConfig.getInlongGroupId(),
dataFlowConfig.getInlongStreamId()))
.topic(sinkConfig.getTopicName())
.dataType(DataTypeEnum.TEXT)
+ .separator(separator)
.build();
}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java
index c8fc0c33b2..7d4f30f752 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java
@@ -19,6 +19,8 @@ package org.apache.inlong.sort.standalone.sink.pulsar;
import org.apache.inlong.common.enums.DataTypeEnum;
import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.CsvConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.DataTypeConfig;
import org.apache.inlong.common.pojo.sort.dataflow.sink.PulsarSinkConfig;
import org.apache.inlong.sort.standalone.config.pojo.IdConfig;
import org.apache.inlong.sort.standalone.config.pojo.InlongId;
@@ -62,13 +64,18 @@ public class PulsarIdConfig extends IdConfig {
public static PulsarIdConfig create(DataFlowConfig dataFlowConfig) {
PulsarSinkConfig sinkConfig = (PulsarSinkConfig)
dataFlowConfig.getSinkConfig();
-
+ DataTypeConfig dataTypeConfig =
dataFlowConfig.getSourceConfig().getDataTypeConfig();
+ String separator = DEFAULT_SEPARATOR;
+ if (dataTypeConfig instanceof CsvConfig) {
+ separator = String.valueOf(((CsvConfig)
dataTypeConfig).getDelimiter());
+ }
return PulsarIdConfig.builder()
.inlongGroupId(dataFlowConfig.getInlongGroupId())
.inlongStreamId(dataFlowConfig.getInlongStreamId())
.uid(InlongId.generateUid(dataFlowConfig.getInlongGroupId(),
dataFlowConfig.getInlongStreamId()))
.topic(sinkConfig.getTopic())
.dataType(DataTypeEnum.TEXT)
+ .separator(separator)
.build();
}