This is an automated email from the ASF dual-hosted git repository.
wakefu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 051f04bb9d [INLONG-11838][Manager] Add the judgment of whether the cdc
audit ID is exist when passing metrics.changelog.audit.key (#11839)
051f04bb9d is described below
commit 051f04bb9d55e96ead6d38e69e89ba48c1a91f39
Author: fuweng11 <[email protected]>
AuthorDate: Mon Apr 21 19:34:08 2025 +0800
[INLONG-11838][Manager] Add the judgment of whether the cdc audit ID is
exist when passing metrics.changelog.audit.key (#11839)
* [INLONG-11838][Manager] Add the judgment of whether the cdc audit ID is
exist when passing metrics.changelog.audit.key
---
.../manager/service/resource/sort/SortFlinkConfigOperator.java | 7 +++++--
1 file changed, 5 insertions(+), 2 deletions(-)
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/SortFlinkConfigOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/SortFlinkConfigOperator.java
index c7653eeb28..4f99be15f5 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/SortFlinkConfigOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/SortFlinkConfigOperator.java
@@ -76,6 +76,8 @@ public class SortFlinkConfigOperator implements
SortConfigOperator {
private static final Logger LOGGER =
LoggerFactory.getLogger(SortFlinkConfigOperator.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final String CDC_AUDIT_KEY = "metrics.changelog.audit.key";
+
@Autowired
private StreamSourceService sourceService;
@Autowired
@@ -318,8 +320,9 @@ public class SortFlinkConfigOperator implements
SortConfigOperator {
}
throw new BusinessException("current audit id can not
find cdc audit information");
}).collect(Collectors.toList());
- properties.putIfAbsent("metrics.changelog.audit.key",
- Joiner.on(InlongConstants.AMPERSAND).join(cdcAuditIdList));
+ if (CollectionUtils.isNotEmpty(cdcAuditIdList)) {
+ properties.putIfAbsent(CDC_AUDIT_KEY,
Joiner.on(InlongConstants.AMPERSAND).join(cdcAuditIdList));
+ }
} catch (Exception e) {
LOGGER.error("Current type ={} is not set auditId", type);
}