This is an automated email from the ASF dual-hosted git repository.
aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 6ce73d2c10 [INLONG-11307][Manager] Fix the problem of unable to obtain
extparams from multiple data sources (#11308)
6ce73d2c10 is described below
commit 6ce73d2c10b71c72928780106e670e80fcc7662e
Author: fuweng11 <[email protected]>
AuthorDate: Wed Oct 9 12:43:33 2024 +0800
[INLONG-11307][Manager] Fix the problem of unable to obtain extparams from
multiple data sources (#11308)
---
.../inlong/manager/service/source/AbstractSourceOperator.java | 11 ++++++++++-
1 file changed, 10 insertions(+), 1 deletion(-)
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
index 7c4c0c4882..5caf6d2473 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
@@ -74,6 +74,7 @@ import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Transactional;
@@ -119,6 +120,9 @@ public abstract class AbstractSourceOperator implements
StreamSourceOperator {
private InlongStreamEntityMapper streamMapper;
@Autowired
private ObjectMapper objectMapper;
+ @Autowired
+ private AutowireCapableBeanFactory autowireCapableBeanFactory;
+ private SourceOperatorFactory operatorFactory;
/**
* Getting the source type.
@@ -523,7 +527,12 @@ public abstract class AbstractSourceOperator implements
StreamSourceOperator {
InlongGroupEntity groupEntity =
groupMapper.selectByGroupIdWithoutTenant(groupId);
InlongStreamEntity streamEntity =
streamMapper.selectByIdentifier(groupId, streamId);
- String extParams = getExtParams(entity);
+ if (operatorFactory == null) {
+ operatorFactory = new SourceOperatorFactory();
+ autowireCapableBeanFactory.autowireBean(operatorFactory);
+ }
+ StreamSourceOperator sourceOperator =
operatorFactory.getInstance(entity.getSourceType());
+ String extParams = sourceOperator.getExtParams(entity);
if (groupEntity != null && streamEntity != null) {
dataConfig.setState(
SourceStatus.NORMAL_STATUS_SET.contains(SourceStatus.forCode(entity.getStatus()))