This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 1ee16ae39 [INLONG-6681][Agent] Handle different sink type 
configurations (#6683)
1ee16ae39 is described below

commit 1ee16ae39af7f7c12f21a007823ba521e31e5b56
Author: xueyingzhang <[email protected]>
AuthorDate: Fri Dec 2 10:21:40 2022 +0800

    [INLONG-6681][Agent] Handle different sink type configurations (#6683)
---
 .../org/apache/inlong/agent/conf/JobProfile.java   | 38 ++++++++++++++-
 .../apache/inlong/agent/constant/JobConstants.java |  8 +++-
 .../apache/inlong/agent/pojo/JobProfileDto.java    | 26 +++++++++-
 .../inlong/agent/conf/TestConfiguration.java       | 56 +++++++++++++++++++++-
 .../java/org/apache/inlong/agent/core/job/Job.java |  9 ++--
 .../inlong/agent/plugin/sinks/SenderManager.java   | 14 ++++--
 inlong-common/pom.xml                              |  4 ++
 .../{TaskTypeEnum.java => DataReportTypeEnum.java} | 55 +++++----------------
 .../apache/inlong/common/enums/TaskTypeEnum.java   |  5 ++
 .../inlong/common/pojo/agent/DataConfig.java       | 16 ++++++-
 .../common/pojo/dataproxy/DataProxyTopicInfo.java  |  6 +++
 .../common/pojo/dataproxy/MQClusterInfo.java       | 12 +++++
 12 files changed, 190 insertions(+), 59 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/JobProfile.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/JobProfile.java
index 655e95da4..47ff35710 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/JobProfile.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/JobProfile.java
@@ -18,14 +18,23 @@
 package org.apache.inlong.agent.conf;
 
 import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.agent.constant.JobConstants;
+import org.apache.inlong.common.pojo.dataproxy.DataProxyTopicInfo;
+import org.apache.inlong.common.pojo.dataproxy.MQClusterInfo;
+
+import java.util.List;
+
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MQ_ClUSTERS;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MQ_TOPIC;
 
 /**
  * job profile which contains details describing properties of one job.
  */
 public class JobProfile extends AbstractConfiguration {
 
-    private final Gson gson = new Gson();
+    private static final Gson GSON = new Gson();
 
     /**
      * parse json string to configuration instance.
@@ -75,10 +84,35 @@ public class JobProfile extends AbstractConfiguration {
     }
 
     public String toJsonStr() {
-        return gson.toJson(getConfigStorage());
+        return GSON.toJson(getConfigStorage());
     }
 
     public String getInstanceId() {
         return get(JobConstants.JOB_INSTANCE_ID);
     }
+
+    /**
+     * get MQClusterInfo list from config
+     */
+    public List<MQClusterInfo> getMqClusters() {
+        List<MQClusterInfo> result = null;
+        String mqClusterStr = get(JOB_MQ_ClUSTERS);
+        if (StringUtils.isNotBlank(mqClusterStr)) {
+            result = GSON.fromJson(mqClusterStr, new 
TypeToken<List<MQClusterInfo>>() {
+            }.getType());
+        }
+        return result;
+    }
+
+    /**
+     * get mqTopic from config
+     */
+    public DataProxyTopicInfo getMqTopic() {
+        DataProxyTopicInfo result = null;
+        String topicStr = get(JOB_MQ_TOPIC);
+        if (StringUtils.isNotBlank(topicStr)) {
+            result = GSON.fromJson(topicStr, DataProxyTopicInfo.class);
+        }
+        return result;
+    }
 }
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
index b4b8cb6e3..f762c90b2 100755
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
@@ -34,7 +34,6 @@ public class JobConstants extends CommonConstants {
     public static final String JOB_SOURCE_CLASS = "job.source";
     public static final String JOB_SOURCE_TYPE = "job.sourceType";
 
-    public static final String JOB_SINK = "job.sink";
     public static final String JOB_CHANNEL = "job.channel";
     public static final String JOB_NAME = "job.name";
     public static final String JOB_LINE_FILTER_PATTERN = "job.pattern";
@@ -44,6 +43,13 @@ public class JobConstants extends CommonConstants {
     public static final String DEFAULT_JOB_DESCRIPTION = "default job 
description";
     public static final String DEFAULT_JOB_LINE_FILTER = "";
 
+    // sink config
+    public static final String JOB_SINK = "job.sink";
+    public static final String JOB_PROXY_SEND = "job.proxySend";
+    public static final boolean DEFAULT_JOB_PROXY_SEND = false;
+    public static final String JOB_MQ_ClUSTERS = "job.mqClusters";
+    public static final String JOB_MQ_TOPIC = "job.topicInfo";
+
     // File job
     public static final String JOB_TRIGGER = "job.fileJob.trigger";
     public static final String JOB_DIR_FILTER_PATTERN = 
"job.fileJob.dir.pattern";
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
index f967dd1a8..77e1fa855 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
@@ -22,6 +22,7 @@ import lombok.Data;
 import org.apache.inlong.agent.conf.AgentConfiguration;
 import org.apache.inlong.agent.conf.TriggerProfile;
 import org.apache.inlong.agent.pojo.FileJob.Line;
+import org.apache.inlong.common.constant.MQType;
 import org.apache.inlong.common.enums.TaskTypeEnum;
 import org.apache.inlong.common.pojo.agent.DataConfig;
 
@@ -37,6 +38,7 @@ public class JobProfileDto {
     public static final String DEFAULT_CHANNEL = 
"org.apache.inlong.agent.plugin.channel.MemoryChannel";
     public static final String MANAGER_JOB = "MANAGER_JOB";
     public static final String DEFAULT_DATAPROXY_SINK = 
"org.apache.inlong.agent.plugin.sinks.ProxySink";
+    public static final String PULSAR_SINK = 
"org.apache.inlong.agent.plugin.sinks.PulsarSink";
 
     /**
      * file source
@@ -350,8 +352,24 @@ public class JobProfileDto {
         job.setOp(dataConfig.getOp());
         job.setDeliveryTime(dataConfig.getDeliveryTime());
         job.setUuid(dataConfig.getUuid());
-        job.setSink(DEFAULT_DATAPROXY_SINK);
         job.setVersion(dataConfig.getVersion());
+        // set sink type
+        if (dataConfig.getDataReportType() == 0) {
+            job.setSink(DEFAULT_DATAPROXY_SINK);
+            job.setProxySend(false);
+        } else if (dataConfig.getDataReportType() == 1) {
+            job.setSink(DEFAULT_DATAPROXY_SINK);
+            job.setProxySend(true);
+        } else {
+            String mqType = dataConfig.getMqClusters().get(0).getMqType();
+            job.setMqClusters(GSON.toJson(dataConfig.getMqClusters()));
+            job.setTopicInfo(GSON.toJson(dataConfig.getTopicInfo()));
+            if (mqType.equals(MQType.PULSAR)) {
+                job.setSink(PULSAR_SINK);
+            } else {
+                throw new IllegalArgumentException("input dataConfig" + 
dataConfig + "is invalid please check");
+            }
+        }
         TaskTypeEnum taskType = 
TaskTypeEnum.getTaskType(dataConfig.getTaskType());
         switch (requireNonNull(taskType)) {
             case SQL:
@@ -397,6 +415,9 @@ public class JobProfileDto {
                 job.setSource(MQTT_SOURCE);
                 profileDto.setJob(job);
                 break;
+            case MOCK:
+                profileDto.setJob(job);
+
             default:
         }
         return TriggerProfile.parseJsonStr(GSON.toJson(profileDto));
@@ -419,6 +440,9 @@ public class JobProfileDto {
         private String deliveryTime;
         private String uuid;
         private Integer version;
+        private boolean proxySend;
+        private String mqClusters;
+        private String topicInfo;
 
         private FileJob fileJob;
         private BinlogJob binlogJob;
diff --git 
a/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/conf/TestConfiguration.java
 
b/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/conf/TestConfiguration.java
index ff150aca0..4019c896b 100755
--- 
a/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/conf/TestConfiguration.java
+++ 
b/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/conf/TestConfiguration.java
@@ -17,13 +17,27 @@
 
 package org.apache.inlong.agent.conf;
 
-import static org.junit.Assert.assertEquals;
-
 import org.apache.inlong.agent.constant.JobConstants;
+import org.apache.inlong.agent.pojo.JobProfileDto;
+import org.apache.inlong.common.constant.MQType;
+import org.apache.inlong.common.pojo.agent.DataConfig;
+import org.apache.inlong.common.pojo.dataproxy.DataProxyTopicInfo;
+import org.apache.inlong.common.pojo.dataproxy.MQClusterInfo;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.inlong.agent.constant.JobConstants.JOB_PROXY_SEND;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_SINK;
+import static 
org.apache.inlong.agent.pojo.JobProfileDto.DEFAULT_DATAPROXY_SINK;
+import static org.apache.inlong.agent.pojo.JobProfileDto.PULSAR_SINK;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
 public class TestConfiguration {
 
     private static AgentConfiguration agentConf;
@@ -70,4 +84,42 @@ public class TestConfiguration {
         assertEquals("test", jobJsonConf.get(JobConstants.JOB_NAME));
     }
 
+    @Test
+    public void testJobSinkConf() {
+        DataConfig dataConfig = new DataConfig();
+        dataConfig.setTaskType(101);
+        dataConfig.setDataReportType(1);
+        JobProfile profile = JobProfileDto.convertToTriggerProfile(dataConfig);
+        assertEquals(profile.get(JOB_SINK), DEFAULT_DATAPROXY_SINK);
+        assertTrue(profile.getBoolean(JOB_PROXY_SEND, false));
+
+        dataConfig.setDataReportType(0);
+        profile = JobProfileDto.convertToTriggerProfile(dataConfig);
+        assertEquals(profile.get(JOB_SINK), DEFAULT_DATAPROXY_SINK);
+        assertFalse(profile.getBoolean(JOB_PROXY_SEND, true));
+
+        List<MQClusterInfo> mqClusterInfos = new ArrayList<>();
+        MQClusterInfo mqCluster = new MQClusterInfo();
+        mqCluster.setMqType(MQType.PULSAR);
+        mqCluster.setToken("token");
+        mqCluster.setUrl("mqurl");
+        assertTrue(mqCluster.isValid());
+
+        mqClusterInfos.add(mqCluster);
+        dataConfig.setDataReportType(2);
+        dataConfig.setMqClusters(mqClusterInfos);
+        dataConfig.setTopicInfo(new DataProxyTopicInfo("topic", "groupId"));
+        profile = JobProfileDto.convertToTriggerProfile(dataConfig);
+        List<MQClusterInfo> mqClusterResult = profile.getMqClusters();
+
+        assertEquals(profile.get(JOB_SINK), PULSAR_SINK);
+        assertEquals(mqClusterResult.size(), 1);
+        assertEquals(mqClusterResult.get(0).getToken(), "token");
+        assertEquals(mqClusterResult.get(0).getUrl(), "mqurl");
+
+        DataProxyTopicInfo topicResult = profile.getMqTopic();
+        assertEquals(topicResult.getTopic(), "topic");
+        assertEquals(topicResult.getInlongGroupId(), "groupId");
+    }
+
 }
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/Job.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/Job.java
index 5f455624c..125fd7897 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/Job.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/Job.java
@@ -38,14 +38,15 @@ import java.util.List;
 public class Job {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(Job.class);
-    private static int COUNTER = 1;
-    private final JobProfile jobConf;
+    protected static int COUNTER = 1;
+    protected final JobProfile jobConf;
     // job name
     private String name;
     // job description
     private String description;
-    private String jobInstanceId;
-    private ThreadLocal<Integer> threadNum = new ThreadLocal<Integer>() {
+    protected String jobInstanceId;
+    protected List<Task> taskList = new ArrayList<>();
+    protected ThreadLocal<Integer> threadNum = new ThreadLocal<Integer>() {
 
         protected Integer initialValue() {
             return 0;
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
index 3e866206a..b2d4a83bc 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
@@ -52,6 +52,8 @@ import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AU
 import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_KEY;
 import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_HOST;
 import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_PORT;
+import static 
org.apache.inlong.agent.constant.JobConstants.DEFAULT_JOB_PROXY_SEND;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_PROXY_SEND;
 import static 
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_GROUP_ID;
 import static 
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_STREAM_ID;
 import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_PLUGIN_ID;
@@ -89,6 +91,7 @@ public class SenderManager {
     private final String inlongGroupId;
     private final int maxSenderPerGroup;
     private final String sourcePath;
+    private final boolean proxySend;
 
     // metric
     private AgentMetricItemSet metricItemSet;
@@ -104,6 +107,7 @@ public class SenderManager {
         AgentConfiguration conf = AgentConfiguration.getAgentConf();
         managerHost = conf.get(AGENT_MANAGER_VIP_HTTP_HOST);
         managerPort = conf.getInt(AGENT_MANAGER_VIP_HTTP_PORT);
+        proxySend = jobConf.getBoolean(JOB_PROXY_SEND, DEFAULT_JOB_PROXY_SEND);
         localhost = jobConf.get(CommonConstants.PROXY_LOCAL_HOST, 
CommonConstants.DEFAULT_PROXY_LOCALHOST);
         netTag = jobConf.get(CommonConstants.PROXY_NET_TAG, 
CommonConstants.DEFAULT_PROXY_NET_TAG);
         isLocalVisit = jobConf.getBoolean(
@@ -240,10 +244,10 @@ public class SenderManager {
             AgentUtils.silenceSleepInMs(retrySleepTime);
         }
         try {
-            selectSender(batchMessage.getGroupId()).asyncSendMessage(
-                    new AgentSenderCallback(batchMessage, retry), 
batchMessage.getDataList(),
-                    batchMessage.getGroupId(), batchMessage.getStreamId(), 
batchMessage.getDataTime(),
-                    SEQUENTIAL_ID.getNextUuid(), maxSenderTimeout, 
TimeUnit.SECONDS, batchMessage.getExtraMap());
+            selectSender(batchMessage.getGroupId()).asyncSendMessage(new 
AgentSenderCallback(batchMessage, retry),
+                    batchMessage.getDataList(), batchMessage.getGroupId(), 
batchMessage.getStreamId(),
+                    batchMessage.getDataTime(), SEQUENTIAL_ID.getNextUuid(), 
maxSenderTimeout, TimeUnit.SECONDS,
+                    batchMessage.getExtraMap(), proxySend);
             int msgCnt = batchMessage.getDataList().size();
             getMetricItem(batchMessage.getGroupId(), 
batchMessage.getStreamId()).pluginSendCount.addAndGet(msgCnt);
 
@@ -275,7 +279,7 @@ public class SenderManager {
 
         try {
             SendResult result = 
selectSender(groupId).sendMessage(batchMessage.getDataList(), groupId, streamId,
-                    dataTime, "", maxSenderTimeout, TimeUnit.SECONDS, 
batchMessage.getExtraMap());
+                    dataTime, "", maxSenderTimeout, TimeUnit.SECONDS, 
batchMessage.getExtraMap(), proxySend);
             metricItem.pluginSendCount.addAndGet(msgCnt);
 
             if (result == SendResult.OK) {
diff --git a/inlong-common/pom.xml b/inlong-common/pom.xml
index e7ef42919..4d88d577d 100644
--- a/inlong-common/pom.xml
+++ b/inlong-common/pom.xml
@@ -80,6 +80,10 @@
             <artifactId>mockito-core</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>commons-collections</groupId>
+            <artifactId>commons-collections</artifactId>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java 
b/inlong-common/src/main/java/org/apache/inlong/common/enums/DataReportTypeEnum.java
similarity index 50%
copy from 
inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java
copy to 
inlong-common/src/main/java/org/apache/inlong/common/enums/DataReportTypeEnum.java
index c4b5c75b9..7c9feb68c 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/enums/DataReportTypeEnum.java
@@ -18,65 +18,34 @@
 package org.apache.inlong.common.enums;
 
 /**
- * Enum of task type.
+ * Enum of data report type
  */
-public enum TaskTypeEnum {
+public enum DataReportTypeEnum {
 
-    DATABASE_MIGRATION(0),
-    SQL(1),
-    BINLOG(2),
-    FILE(3),
-    KAFKA(4),
-    PULSAR(5),
-    POSTGRES(6),
-    ORACLE(7),
-    SQLSERVER(8),
-    MONGODB(9),
-    TUBEMQ(10),
-    REDIS(11),
-    MQTT(12),
-
-    ;
+    NORMAL_SEND_TO_DATAPROXY(0),
+    PROXY_SEND_TO_DATAPROXY(1),
+    DIRECT_SEND_TO_MQ(2);
 
     private final int type;
 
-    TaskTypeEnum(int type) {
+    DataReportTypeEnum(int type) {
         this.type = type;
     }
 
-    public static TaskTypeEnum getTaskType(int taskType) {
-        switch (taskType) {
+    public static DataReportTypeEnum getReportType(int type) {
+        switch (type) {
             case 0:
-                return DATABASE_MIGRATION;
+                return NORMAL_SEND_TO_DATAPROXY;
             case 1:
-                return SQL;
+                return PROXY_SEND_TO_DATAPROXY;
             case 2:
-                return BINLOG;
-            case 3:
-                return FILE;
-            case 4:
-                return KAFKA;
-            case 5:
-                return PULSAR;
-            case 6:
-                return POSTGRES;
-            case 7:
-                return ORACLE;
-            case 8:
-                return SQLSERVER;
-            case 9:
-                return MONGODB;
-            case 10:
-                return TUBEMQ;
-            case 12:
-                return MQTT;
+                return DIRECT_SEND_TO_MQ;
             default:
-                throw new RuntimeException("Unsupported task type " + 
taskType);
+                throw new RuntimeException("Unsupported data report type " + 
type);
         }
     }
 
     public int getType() {
         return type;
     }
-
 }
diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java 
b/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java
index c4b5c75b9..314cc9bde 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java
@@ -36,6 +36,9 @@ public enum TaskTypeEnum {
     REDIS(11),
     MQTT(12),
 
+    // only used for unit test
+    MOCK(101)
+
     ;
 
     private final int type;
@@ -70,6 +73,8 @@ public enum TaskTypeEnum {
                 return TUBEMQ;
             case 12:
                 return MQTT;
+            case 101:
+                return MOCK;
             default:
                 throw new RuntimeException("Unsupported task type " + 
taskType);
         }
diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/DataConfig.java
 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/DataConfig.java
index e86c1142e..446a92977 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/DataConfig.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/DataConfig.java
@@ -18,11 +18,17 @@
 package org.apache.inlong.common.pojo.agent;
 
 import lombok.Data;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.inlong.common.enums.DataReportTypeEnum;
 import org.apache.inlong.common.pojo.dataproxy.DataProxyTopicInfo;
 import org.apache.inlong.common.pojo.dataproxy.MQClusterInfo;
 
 import java.util.List;
 
+import static 
org.apache.inlong.common.enums.DataReportTypeEnum.DIRECT_SEND_TO_MQ;
+import static 
org.apache.inlong.common.enums.DataReportTypeEnum.NORMAL_SEND_TO_DATAPROXY;
+import static 
org.apache.inlong.common.enums.DataReportTypeEnum.PROXY_SEND_TO_DATAPROXY;
+
 /**
  * The task config for agent.
  */
@@ -74,6 +80,14 @@ public class DataConfig {
     private DataProxyTopicInfo topicInfo;
 
     public boolean isValid() {
-        return true;
+        DataReportTypeEnum reportType = 
DataReportTypeEnum.getReportType(dataReportType);
+        if (reportType == NORMAL_SEND_TO_DATAPROXY || reportType == 
PROXY_SEND_TO_DATAPROXY) {
+            return true;
+        }
+        if (reportType == DIRECT_SEND_TO_MQ && 
CollectionUtils.isNotEmpty(mqClusters) && mqClusters.stream()
+                .allMatch(MQClusterInfo::isValid) && topicInfo.isValid()) {
+            return true;
+        }
+        return false;
     }
 }
\ No newline at end of file
diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/DataProxyTopicInfo.java
 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/DataProxyTopicInfo.java
index d1deaa4f1..1230f3568 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/DataProxyTopicInfo.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/DataProxyTopicInfo.java
@@ -17,6 +17,8 @@
 
 package org.apache.inlong.common.pojo.dataproxy;
 
+import org.apache.commons.lang3.StringUtils;
+
 /**
  * Topic info for DataProxy, includes the topic name and the inlongGroupId to 
which it belongs.
  */
@@ -83,4 +85,8 @@ public class DataProxyTopicInfo {
         this.m = m;
     }
 
+    public boolean isValid() {
+        return StringUtils.isNoneBlank(inlongGroupId) && 
StringUtils.isNotBlank(topic);
+    }
+
 }
diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/MQClusterInfo.java
 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/MQClusterInfo.java
index 96cf7fa37..4b94d8391 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/MQClusterInfo.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/MQClusterInfo.java
@@ -17,6 +17,8 @@
 
 package org.apache.inlong.common.pojo.dataproxy;
 
+import org.apache.commons.lang3.StringUtils;
+
 import java.util.HashMap;
 import java.util.Map;
 
@@ -27,6 +29,9 @@ public class MQClusterInfo {
 
     private String url;
     private String token;
+    /**
+     * MQType.PULSAR, MQType.TUBEMQ  or MQType.PULSAR
+     */
     private String mqType;
     private Map<String, String> params = new HashMap<>();
 
@@ -61,4 +66,11 @@ public class MQClusterInfo {
     public void setParams(Map<String, String> params) {
         this.params = params;
     }
+
+    public boolean isValid() {
+        if (StringUtils.isBlank(mqType) || StringUtils.isBlank(url)) {
+            return false;
+        }
+        return true;
+    }
 }

Reply via email to