This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 34a7e8f50 [INLONG-7411][Sort] Fix the invalid of kafka source meitric
due to inlongMetric being null (#7409)
34a7e8f50 is described below
commit 34a7e8f501e9edde01a48b4413fcac3770644536
Author: LinChen <[email protected]>
AuthorDate: Thu Feb 23 16:21:09 2023 +0800
[INLONG-7411][Sort] Fix the invalid of kafka source meitric due to
inlongMetric being null (#7409)
Co-authored-by: leolinchen <[email protected]>
---
.../inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java | 6 ++++--
1 file changed, 4 insertions(+), 2 deletions(-)
diff --git
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java
index f41c9a78b..73507aa6c 100644
---
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java
+++
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java
@@ -217,6 +217,8 @@ public class UpsertKafkaDynamicTableFactory
// Build the dirty data side-output
final DirtyOptions dirtyOptions =
DirtyOptions.fromConfig(tableOptions);
final DirtySink<String> dirtySink =
DirtySinkFactoryUtils.createDirtySink(context, dirtyOptions);
+ final String inlongMetric =
tableOptions.getOptional(INLONG_METRIC).orElse(null);
+ final String auditHostAndPorts =
tableOptions.getOptional(INLONG_AUDIT).orElse(null);
return new KafkaDynamicSource(
schema.toPhysicalRowDataType(),
keyDecodingFormat,
@@ -231,8 +233,8 @@ public class UpsertKafkaDynamicTableFactory
Collections.emptyMap(),
0,
true,
- null,
- null,
+ inlongMetric,
+ auditHostAndPorts,
dirtyOptions,
dirtySink);
}