This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 1ee16ae39 [INLONG-6681][Agent] Handle different sink type
configurations (#6683)
1ee16ae39 is described below
commit 1ee16ae39af7f7c12f21a007823ba521e31e5b56
Author: xueyingzhang <[email protected]>
AuthorDate: Fri Dec 2 10:21:40 2022 +0800
[INLONG-6681][Agent] Handle different sink type configurations (#6683)
---
.../org/apache/inlong/agent/conf/JobProfile.java | 38 ++++++++++++++-
.../apache/inlong/agent/constant/JobConstants.java | 8 +++-
.../apache/inlong/agent/pojo/JobProfileDto.java | 26 +++++++++-
.../inlong/agent/conf/TestConfiguration.java | 56 +++++++++++++++++++++-
.../java/org/apache/inlong/agent/core/job/Job.java | 9 ++--
.../inlong/agent/plugin/sinks/SenderManager.java | 14 ++++--
inlong-common/pom.xml | 4 ++
.../{TaskTypeEnum.java => DataReportTypeEnum.java} | 55 +++++----------------
.../apache/inlong/common/enums/TaskTypeEnum.java | 5 ++
.../inlong/common/pojo/agent/DataConfig.java | 16 ++++++-
.../common/pojo/dataproxy/DataProxyTopicInfo.java | 6 +++
.../common/pojo/dataproxy/MQClusterInfo.java | 12 +++++
12 files changed, 190 insertions(+), 59 deletions(-)
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/JobProfile.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/JobProfile.java
index 655e95da4..47ff35710 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/JobProfile.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/JobProfile.java
@@ -18,14 +18,23 @@
package org.apache.inlong.agent.conf;
import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.agent.constant.JobConstants;
+import org.apache.inlong.common.pojo.dataproxy.DataProxyTopicInfo;
+import org.apache.inlong.common.pojo.dataproxy.MQClusterInfo;
+
+import java.util.List;
+
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MQ_ClUSTERS;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MQ_TOPIC;
/**
* job profile which contains details describing properties of one job.
*/
public class JobProfile extends AbstractConfiguration {
- private final Gson gson = new Gson();
+ private static final Gson GSON = new Gson();
/**
* parse json string to configuration instance.
@@ -75,10 +84,35 @@ public class JobProfile extends AbstractConfiguration {
}
public String toJsonStr() {
- return gson.toJson(getConfigStorage());
+ return GSON.toJson(getConfigStorage());
}
public String getInstanceId() {
return get(JobConstants.JOB_INSTANCE_ID);
}
+
+ /**
+ * get MQClusterInfo list from config
+ */
+ public List<MQClusterInfo> getMqClusters() {
+ List<MQClusterInfo> result = null;
+ String mqClusterStr = get(JOB_MQ_ClUSTERS);
+ if (StringUtils.isNotBlank(mqClusterStr)) {
+ result = GSON.fromJson(mqClusterStr, new
TypeToken<List<MQClusterInfo>>() {
+ }.getType());
+ }
+ return result;
+ }
+
+ /**
+ * get mqTopic from config
+ */
+ public DataProxyTopicInfo getMqTopic() {
+ DataProxyTopicInfo result = null;
+ String topicStr = get(JOB_MQ_TOPIC);
+ if (StringUtils.isNotBlank(topicStr)) {
+ result = GSON.fromJson(topicStr, DataProxyTopicInfo.class);
+ }
+ return result;
+ }
}
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
index b4b8cb6e3..f762c90b2 100755
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
@@ -34,7 +34,6 @@ public class JobConstants extends CommonConstants {
public static final String JOB_SOURCE_CLASS = "job.source";
public static final String JOB_SOURCE_TYPE = "job.sourceType";
- public static final String JOB_SINK = "job.sink";
public static final String JOB_CHANNEL = "job.channel";
public static final String JOB_NAME = "job.name";
public static final String JOB_LINE_FILTER_PATTERN = "job.pattern";
@@ -44,6 +43,13 @@ public class JobConstants extends CommonConstants {
public static final String DEFAULT_JOB_DESCRIPTION = "default job
description";
public static final String DEFAULT_JOB_LINE_FILTER = "";
+ // sink config
+ public static final String JOB_SINK = "job.sink";
+ public static final String JOB_PROXY_SEND = "job.proxySend";
+ public static final boolean DEFAULT_JOB_PROXY_SEND = false;
+ public static final String JOB_MQ_ClUSTERS = "job.mqClusters";
+ public static final String JOB_MQ_TOPIC = "job.topicInfo";
+
// File job
public static final String JOB_TRIGGER = "job.fileJob.trigger";
public static final String JOB_DIR_FILTER_PATTERN =
"job.fileJob.dir.pattern";
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
index f967dd1a8..77e1fa855 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
@@ -22,6 +22,7 @@ import lombok.Data;
import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.TriggerProfile;
import org.apache.inlong.agent.pojo.FileJob.Line;
+import org.apache.inlong.common.constant.MQType;
import org.apache.inlong.common.enums.TaskTypeEnum;
import org.apache.inlong.common.pojo.agent.DataConfig;
@@ -37,6 +38,7 @@ public class JobProfileDto {
public static final String DEFAULT_CHANNEL =
"org.apache.inlong.agent.plugin.channel.MemoryChannel";
public static final String MANAGER_JOB = "MANAGER_JOB";
public static final String DEFAULT_DATAPROXY_SINK =
"org.apache.inlong.agent.plugin.sinks.ProxySink";
+ public static final String PULSAR_SINK =
"org.apache.inlong.agent.plugin.sinks.PulsarSink";
/**
* file source
@@ -350,8 +352,24 @@ public class JobProfileDto {
job.setOp(dataConfig.getOp());
job.setDeliveryTime(dataConfig.getDeliveryTime());
job.setUuid(dataConfig.getUuid());
- job.setSink(DEFAULT_DATAPROXY_SINK);
job.setVersion(dataConfig.getVersion());
+ // set sink type
+ if (dataConfig.getDataReportType() == 0) {
+ job.setSink(DEFAULT_DATAPROXY_SINK);
+ job.setProxySend(false);
+ } else if (dataConfig.getDataReportType() == 1) {
+ job.setSink(DEFAULT_DATAPROXY_SINK);
+ job.setProxySend(true);
+ } else {
+ String mqType = dataConfig.getMqClusters().get(0).getMqType();
+ job.setMqClusters(GSON.toJson(dataConfig.getMqClusters()));
+ job.setTopicInfo(GSON.toJson(dataConfig.getTopicInfo()));
+ if (mqType.equals(MQType.PULSAR)) {
+ job.setSink(PULSAR_SINK);
+ } else {
+ throw new IllegalArgumentException("input dataConfig" +
dataConfig + "is invalid please check");
+ }
+ }
TaskTypeEnum taskType =
TaskTypeEnum.getTaskType(dataConfig.getTaskType());
switch (requireNonNull(taskType)) {
case SQL:
@@ -397,6 +415,9 @@ public class JobProfileDto {
job.setSource(MQTT_SOURCE);
profileDto.setJob(job);
break;
+ case MOCK:
+ profileDto.setJob(job);
+
default:
}
return TriggerProfile.parseJsonStr(GSON.toJson(profileDto));
@@ -419,6 +440,9 @@ public class JobProfileDto {
private String deliveryTime;
private String uuid;
private Integer version;
+ private boolean proxySend;
+ private String mqClusters;
+ private String topicInfo;
private FileJob fileJob;
private BinlogJob binlogJob;
diff --git
a/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/conf/TestConfiguration.java
b/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/conf/TestConfiguration.java
index ff150aca0..4019c896b 100755
---
a/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/conf/TestConfiguration.java
+++
b/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/conf/TestConfiguration.java
@@ -17,13 +17,27 @@
package org.apache.inlong.agent.conf;
-import static org.junit.Assert.assertEquals;
-
import org.apache.inlong.agent.constant.JobConstants;
+import org.apache.inlong.agent.pojo.JobProfileDto;
+import org.apache.inlong.common.constant.MQType;
+import org.apache.inlong.common.pojo.agent.DataConfig;
+import org.apache.inlong.common.pojo.dataproxy.DataProxyTopicInfo;
+import org.apache.inlong.common.pojo.dataproxy.MQClusterInfo;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.inlong.agent.constant.JobConstants.JOB_PROXY_SEND;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_SINK;
+import static
org.apache.inlong.agent.pojo.JobProfileDto.DEFAULT_DATAPROXY_SINK;
+import static org.apache.inlong.agent.pojo.JobProfileDto.PULSAR_SINK;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
public class TestConfiguration {
private static AgentConfiguration agentConf;
@@ -70,4 +84,42 @@ public class TestConfiguration {
assertEquals("test", jobJsonConf.get(JobConstants.JOB_NAME));
}
+ @Test
+ public void testJobSinkConf() {
+ DataConfig dataConfig = new DataConfig();
+ dataConfig.setTaskType(101);
+ dataConfig.setDataReportType(1);
+ JobProfile profile = JobProfileDto.convertToTriggerProfile(dataConfig);
+ assertEquals(profile.get(JOB_SINK), DEFAULT_DATAPROXY_SINK);
+ assertTrue(profile.getBoolean(JOB_PROXY_SEND, false));
+
+ dataConfig.setDataReportType(0);
+ profile = JobProfileDto.convertToTriggerProfile(dataConfig);
+ assertEquals(profile.get(JOB_SINK), DEFAULT_DATAPROXY_SINK);
+ assertFalse(profile.getBoolean(JOB_PROXY_SEND, true));
+
+ List<MQClusterInfo> mqClusterInfos = new ArrayList<>();
+ MQClusterInfo mqCluster = new MQClusterInfo();
+ mqCluster.setMqType(MQType.PULSAR);
+ mqCluster.setToken("token");
+ mqCluster.setUrl("mqurl");
+ assertTrue(mqCluster.isValid());
+
+ mqClusterInfos.add(mqCluster);
+ dataConfig.setDataReportType(2);
+ dataConfig.setMqClusters(mqClusterInfos);
+ dataConfig.setTopicInfo(new DataProxyTopicInfo("topic", "groupId"));
+ profile = JobProfileDto.convertToTriggerProfile(dataConfig);
+ List<MQClusterInfo> mqClusterResult = profile.getMqClusters();
+
+ assertEquals(profile.get(JOB_SINK), PULSAR_SINK);
+ assertEquals(mqClusterResult.size(), 1);
+ assertEquals(mqClusterResult.get(0).getToken(), "token");
+ assertEquals(mqClusterResult.get(0).getUrl(), "mqurl");
+
+ DataProxyTopicInfo topicResult = profile.getMqTopic();
+ assertEquals(topicResult.getTopic(), "topic");
+ assertEquals(topicResult.getInlongGroupId(), "groupId");
+ }
+
}
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/Job.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/Job.java
index 5f455624c..125fd7897 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/Job.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/Job.java
@@ -38,14 +38,15 @@ import java.util.List;
public class Job {
private static final Logger LOGGER = LoggerFactory.getLogger(Job.class);
- private static int COUNTER = 1;
- private final JobProfile jobConf;
+ protected static int COUNTER = 1;
+ protected final JobProfile jobConf;
// job name
private String name;
// job description
private String description;
- private String jobInstanceId;
- private ThreadLocal<Integer> threadNum = new ThreadLocal<Integer>() {
+ protected String jobInstanceId;
+ protected List<Task> taskList = new ArrayList<>();
+ protected ThreadLocal<Integer> threadNum = new ThreadLocal<Integer>() {
protected Integer initialValue() {
return 0;
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
index 3e866206a..b2d4a83bc 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
@@ -52,6 +52,8 @@ import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AU
import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_KEY;
import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_HOST;
import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_PORT;
+import static
org.apache.inlong.agent.constant.JobConstants.DEFAULT_JOB_PROXY_SEND;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_PROXY_SEND;
import static
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_GROUP_ID;
import static
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_STREAM_ID;
import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_PLUGIN_ID;
@@ -89,6 +91,7 @@ public class SenderManager {
private final String inlongGroupId;
private final int maxSenderPerGroup;
private final String sourcePath;
+ private final boolean proxySend;
// metric
private AgentMetricItemSet metricItemSet;
@@ -104,6 +107,7 @@ public class SenderManager {
AgentConfiguration conf = AgentConfiguration.getAgentConf();
managerHost = conf.get(AGENT_MANAGER_VIP_HTTP_HOST);
managerPort = conf.getInt(AGENT_MANAGER_VIP_HTTP_PORT);
+ proxySend = jobConf.getBoolean(JOB_PROXY_SEND, DEFAULT_JOB_PROXY_SEND);
localhost = jobConf.get(CommonConstants.PROXY_LOCAL_HOST,
CommonConstants.DEFAULT_PROXY_LOCALHOST);
netTag = jobConf.get(CommonConstants.PROXY_NET_TAG,
CommonConstants.DEFAULT_PROXY_NET_TAG);
isLocalVisit = jobConf.getBoolean(
@@ -240,10 +244,10 @@ public class SenderManager {
AgentUtils.silenceSleepInMs(retrySleepTime);
}
try {
- selectSender(batchMessage.getGroupId()).asyncSendMessage(
- new AgentSenderCallback(batchMessage, retry),
batchMessage.getDataList(),
- batchMessage.getGroupId(), batchMessage.getStreamId(),
batchMessage.getDataTime(),
- SEQUENTIAL_ID.getNextUuid(), maxSenderTimeout,
TimeUnit.SECONDS, batchMessage.getExtraMap());
+ selectSender(batchMessage.getGroupId()).asyncSendMessage(new
AgentSenderCallback(batchMessage, retry),
+ batchMessage.getDataList(), batchMessage.getGroupId(),
batchMessage.getStreamId(),
+ batchMessage.getDataTime(), SEQUENTIAL_ID.getNextUuid(),
maxSenderTimeout, TimeUnit.SECONDS,
+ batchMessage.getExtraMap(), proxySend);
int msgCnt = batchMessage.getDataList().size();
getMetricItem(batchMessage.getGroupId(),
batchMessage.getStreamId()).pluginSendCount.addAndGet(msgCnt);
@@ -275,7 +279,7 @@ public class SenderManager {
try {
SendResult result =
selectSender(groupId).sendMessage(batchMessage.getDataList(), groupId, streamId,
- dataTime, "", maxSenderTimeout, TimeUnit.SECONDS,
batchMessage.getExtraMap());
+ dataTime, "", maxSenderTimeout, TimeUnit.SECONDS,
batchMessage.getExtraMap(), proxySend);
metricItem.pluginSendCount.addAndGet(msgCnt);
if (result == SendResult.OK) {
diff --git a/inlong-common/pom.xml b/inlong-common/pom.xml
index e7ef42919..4d88d577d 100644
--- a/inlong-common/pom.xml
+++ b/inlong-common/pom.xml
@@ -80,6 +80,10 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>commons-collections</groupId>
+ <artifactId>commons-collections</artifactId>
+ </dependency>
</dependencies>
<build>
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java
b/inlong-common/src/main/java/org/apache/inlong/common/enums/DataReportTypeEnum.java
similarity index 50%
copy from
inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java
copy to
inlong-common/src/main/java/org/apache/inlong/common/enums/DataReportTypeEnum.java
index c4b5c75b9..7c9feb68c 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/enums/DataReportTypeEnum.java
@@ -18,65 +18,34 @@
package org.apache.inlong.common.enums;
/**
- * Enum of task type.
+ * Enum of data report type
*/
-public enum TaskTypeEnum {
+public enum DataReportTypeEnum {
- DATABASE_MIGRATION(0),
- SQL(1),
- BINLOG(2),
- FILE(3),
- KAFKA(4),
- PULSAR(5),
- POSTGRES(6),
- ORACLE(7),
- SQLSERVER(8),
- MONGODB(9),
- TUBEMQ(10),
- REDIS(11),
- MQTT(12),
-
- ;
+ NORMAL_SEND_TO_DATAPROXY(0),
+ PROXY_SEND_TO_DATAPROXY(1),
+ DIRECT_SEND_TO_MQ(2);
private final int type;
- TaskTypeEnum(int type) {
+ DataReportTypeEnum(int type) {
this.type = type;
}
- public static TaskTypeEnum getTaskType(int taskType) {
- switch (taskType) {
+ public static DataReportTypeEnum getReportType(int type) {
+ switch (type) {
case 0:
- return DATABASE_MIGRATION;
+ return NORMAL_SEND_TO_DATAPROXY;
case 1:
- return SQL;
+ return PROXY_SEND_TO_DATAPROXY;
case 2:
- return BINLOG;
- case 3:
- return FILE;
- case 4:
- return KAFKA;
- case 5:
- return PULSAR;
- case 6:
- return POSTGRES;
- case 7:
- return ORACLE;
- case 8:
- return SQLSERVER;
- case 9:
- return MONGODB;
- case 10:
- return TUBEMQ;
- case 12:
- return MQTT;
+ return DIRECT_SEND_TO_MQ;
default:
- throw new RuntimeException("Unsupported task type " +
taskType);
+ throw new RuntimeException("Unsupported data report type " +
type);
}
}
public int getType() {
return type;
}
-
}
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java
b/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java
index c4b5c75b9..314cc9bde 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java
@@ -36,6 +36,9 @@ public enum TaskTypeEnum {
REDIS(11),
MQTT(12),
+ // only used for unit test
+ MOCK(101)
+
;
private final int type;
@@ -70,6 +73,8 @@ public enum TaskTypeEnum {
return TUBEMQ;
case 12:
return MQTT;
+ case 101:
+ return MOCK;
default:
throw new RuntimeException("Unsupported task type " +
taskType);
}
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/DataConfig.java
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/DataConfig.java
index e86c1142e..446a92977 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/DataConfig.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/DataConfig.java
@@ -18,11 +18,17 @@
package org.apache.inlong.common.pojo.agent;
import lombok.Data;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.inlong.common.enums.DataReportTypeEnum;
import org.apache.inlong.common.pojo.dataproxy.DataProxyTopicInfo;
import org.apache.inlong.common.pojo.dataproxy.MQClusterInfo;
import java.util.List;
+import static
org.apache.inlong.common.enums.DataReportTypeEnum.DIRECT_SEND_TO_MQ;
+import static
org.apache.inlong.common.enums.DataReportTypeEnum.NORMAL_SEND_TO_DATAPROXY;
+import static
org.apache.inlong.common.enums.DataReportTypeEnum.PROXY_SEND_TO_DATAPROXY;
+
/**
* The task config for agent.
*/
@@ -74,6 +80,14 @@ public class DataConfig {
private DataProxyTopicInfo topicInfo;
public boolean isValid() {
- return true;
+ DataReportTypeEnum reportType =
DataReportTypeEnum.getReportType(dataReportType);
+ if (reportType == NORMAL_SEND_TO_DATAPROXY || reportType ==
PROXY_SEND_TO_DATAPROXY) {
+ return true;
+ }
+ if (reportType == DIRECT_SEND_TO_MQ &&
CollectionUtils.isNotEmpty(mqClusters) && mqClusters.stream()
+ .allMatch(MQClusterInfo::isValid) && topicInfo.isValid()) {
+ return true;
+ }
+ return false;
}
}
\ No newline at end of file
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/DataProxyTopicInfo.java
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/DataProxyTopicInfo.java
index d1deaa4f1..1230f3568 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/DataProxyTopicInfo.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/DataProxyTopicInfo.java
@@ -17,6 +17,8 @@
package org.apache.inlong.common.pojo.dataproxy;
+import org.apache.commons.lang3.StringUtils;
+
/**
* Topic info for DataProxy, includes the topic name and the inlongGroupId to
which it belongs.
*/
@@ -83,4 +85,8 @@ public class DataProxyTopicInfo {
this.m = m;
}
+ public boolean isValid() {
+ return StringUtils.isNoneBlank(inlongGroupId) &&
StringUtils.isNotBlank(topic);
+ }
+
}
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/MQClusterInfo.java
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/MQClusterInfo.java
index 96cf7fa37..4b94d8391 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/MQClusterInfo.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/MQClusterInfo.java
@@ -17,6 +17,8 @@
package org.apache.inlong.common.pojo.dataproxy;
+import org.apache.commons.lang3.StringUtils;
+
import java.util.HashMap;
import java.util.Map;
@@ -27,6 +29,9 @@ public class MQClusterInfo {
private String url;
private String token;
+ /**
+ * MQType.PULSAR, MQType.TUBEMQ or MQType.PULSAR
+ */
private String mqType;
private Map<String, String> params = new HashMap<>();
@@ -61,4 +66,11 @@ public class MQClusterInfo {
public void setParams(Map<String, String> params) {
this.params = params;
}
+
+ public boolean isValid() {
+ if (StringUtils.isBlank(mqType) || StringUtils.isBlank(url)) {
+ return false;
+ }
+ return true;
+ }
}