This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 381e34e1f [INLONG-7419][Agent] Fix error of MQTT connector (#7420)
381e34e1f is described below

commit 381e34e1f2ba564297a9c6dfe89768f6e7848235
Author: Lizhen <[email protected]>
AuthorDate: Mon Feb 27 11:11:13 2023 +0800

    [INLONG-7419][Agent] Fix error of MQTT connector (#7420)
---
 .../java/org/apache/inlong/agent/pojo/JobProfileDto.java |  5 +++--
 .../main/java/org/apache/inlong/agent/pojo/MqttJob.java  |  6 ++++--
 .../apache/inlong/agent/plugin/sources/MqttSource.java   | 10 ++++++----
 .../inlong/agent/plugin/sources/reader/MqttReader.java   | 16 ++++++++++++----
 .../inlong/agent/plugin/sources/TestMqttSource.java      |  1 -
 5 files changed, 25 insertions(+), 13 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
index 169392f02..57cae7730 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
@@ -343,13 +343,14 @@ public class JobProfileDto {
         MqttJob mqttJob = new MqttJob();
 
         mqttJob.setServerURI(config.getServerURI());
-        mqttJob.setUserName(config.getUserName());
+        mqttJob.setUserName(config.getUsername());
         mqttJob.setPassword(config.getPassword());
+        mqttJob.setTopic(config.getTopic());
         mqttJob.setConnectionTimeOut(config.getConnectionTimeOut());
         mqttJob.setKeepAliveInterval(config.getKeepAliveInterval());
         mqttJob.setQos(config.getQos());
         mqttJob.setCleanSession(config.getCleanSession());
-        mqttJob.setClientIdPrefix(config.getClientIdPrefix());
+        mqttJob.setClientIdPrefix(config.getClientId());
         mqttJob.setQueueSize(config.getQueueSize());
         mqttJob.setAutomaticReconnect(config.getAutomaticReconnect());
         mqttJob.setMqttVersion(config.getMqttVersion());
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/MqttJob.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/MqttJob.java
index edf7a07aa..013b04a86 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/MqttJob.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/MqttJob.java
@@ -28,6 +28,7 @@ public class MqttJob {
     private String serverURI;
     private String userName;
     private String password;
+    private String topic;
     private String connectionTimeOut;
     private String keepAliveInterval;
     private String qos;
@@ -41,13 +42,14 @@ public class MqttJob {
     public static class MqttJobConfig {
 
         private String serverURI;
-        private String userName;
+        private String username;
         private String password;
+        private String topic;
         private String connectionTimeOut;
         private String keepAliveInterval;
         private String qos;
         private String cleanSession;
-        private String clientIdPrefix;
+        private String clientId;
         private String queueSize;
         private String automaticReconnect;
         private String mqttVersion;
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MqttSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MqttSource.java
index 1f4e1b586..814a5ccd4 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MqttSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MqttSource.java
@@ -41,12 +41,12 @@ public class MqttSource extends AbstractSource {
 
     private static final String JOB_MQTTJOB_CLIENTID = "";
 
-    public static final String JOB_MQTTJOB_TOPICS = "job.mqttJob.topics";
+    public static final String JOB_MQTTJOB_TOPICS = "job.mqttJob.topic";
 
     public MqttSource() {
     }
 
-    private List<Reader> splitSqlJob(String topics) {
+    private List<Reader> splitSqlJob(String topics, String instanceId) {
         if (StringUtils.isEmpty(topics)) {
             return null;
         }
@@ -54,7 +54,9 @@ public class MqttSource extends AbstractSource {
         String[] topicList = topics.split(CommonConstants.COMMA);
         if (Objects.nonNull(topicList)) {
             Arrays.stream(topicList).forEach(topic -> {
-                result.add(new MqttReader(topic));
+                MqttReader mqttReader = new MqttReader(topic);
+                mqttReader.setReadSource(instanceId);
+                result.add(mqttReader);
             });
         }
         return result;
@@ -66,7 +68,7 @@ public class MqttSource extends AbstractSource {
         String topics = conf.get(JOB_MQTTJOB_TOPICS, StringUtils.EMPTY);
         List<Reader> readerList = null;
         if (StringUtils.isNotEmpty(topics)) {
-            readerList = splitSqlJob(topics);
+            readerList = splitSqlJob(topics, conf.getInstanceId());
         }
         if (CollectionUtils.isNotEmpty(readerList)) {
             sourceMetric.sourceSuccessCount.incrementAndGet();
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/MqttReader.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/MqttReader.java
index 6a0b0d340..2f02a3732 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/MqttReader.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/MqttReader.java
@@ -20,6 +20,7 @@ package org.apache.inlong.agent.plugin.sources.reader;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.agent.conf.JobProfile;
 import org.apache.inlong.agent.message.DefaultMessage;
+import org.apache.inlong.agent.metrics.audit.AuditUtils;
 import org.apache.inlong.agent.plugin.Message;
 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
 import org.eclipse.paho.client.mqttv3.MqttCallback;
@@ -43,6 +44,7 @@ public class MqttReader extends AbstractReader {
     public static final String JOB_MQTT_USERNAME = "job.mqttJob.userName";
     public static final String JOB_MQTT_PASSWORD = "job.mqttJob.password";
     public static final String JOB_MQTT_SERVER_URI = "job.mqttJob.serverURI";
+    public static final String JOB_MQTT_TOPIC = "job.mqttJob.topic";
     public static final String JOB_MQTT_CONNECTION_TIMEOUT = 
"job.mqttJob.connectionTimeOut";
     public static final String JOB_MQTT_KEEPALIVE_INTERVAL = 
"job.mqttJob.keepAliveInterval";
     public static final String JOB_MQTT_QOS = "job.mqttJob.qos";
@@ -89,6 +91,7 @@ public class MqttReader extends AbstractReader {
         userName = jobConf.get(JOB_MQTT_USERNAME);
         password = jobConf.get(JOB_MQTT_PASSWORD);
         serverURI = jobConf.get(JOB_MQTT_SERVER_URI);
+        topic = jobConf.get(JOB_MQTT_TOPIC);
         clientId = jobConf.get(JOB_MQTT_CLIENT_ID_PREFIX, "mqtt_client") + "_" 
+ UUID.randomUUID();
         cleanSession = jobConf.getBoolean(JOB_MQTT_CLEAN_SESSION, false);
         automaticReconnect = jobConf.getBoolean(JOB_MQTT_AUTOMATIC_RECONNECT, 
true);
@@ -116,8 +119,8 @@ public class MqttReader extends AbstractReader {
 
                     @Override
                     public void connectionLost(Throwable cause) {
-                        LOGGER.info("the mqtt connection is lost, try to 
reconnect. jobId:{},serverURI:{},clientId:{}",
-                                instanceId, serverURI, clientId);
+                        LOGGER.error("the mqtt jobId:{}, serverURI:{}, 
connection lost, {} ", instanceId,
+                                serverURI, cause);
                         reconnect();
                     }
 
@@ -130,7 +133,8 @@ public class MqttReader extends AbstractReader {
                         byte[] recordValue = message.getPayload();
                         mqttMessagesQueue.put(new DefaultMessage(recordValue, 
headerMap));
 
-                        LOGGER.debug("the mqtt receive message: {}", new 
String(recordValue));
+                        AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, 
inlongGroupId, inlongStreamId,
+                                System.currentTimeMillis(), 1, 
recordValue.length);
 
                         readerMetric.pluginReadSuccessCount.incrementAndGet();
                         readerMetric.pluginReadCount.incrementAndGet();
@@ -141,7 +145,7 @@ public class MqttReader extends AbstractReader {
                     }
                 });
                 client.connect(options);
-                client.subscribe(topic, 1);
+                client.subscribe(topic, qos);
             }
             LOGGER.info("the mqtt subscribe topic is [{}], qos is [{}]", 
topic, qos);
         } catch (Exception e) {
@@ -227,6 +231,10 @@ public class MqttReader extends AbstractReader {
         }
     }
 
+    public void setReadSource(String instanceId) {
+        this.instanceId = instanceId;
+    }
+
     @Override
     public void destroy() {
         synchronized (this) {
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMqttSource.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMqttSource.java
index 083bdfdb3..5a5faa698 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMqttSource.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMqttSource.java
@@ -92,7 +92,6 @@ public class TestMqttSource {
         when(jobProfile.get(eq(CommonConstants.PROXY_INLONG_STREAM_ID), 
anyString())).thenReturn("test_stream");
         when(jobProfile.get(eq(MqttSource.JOB_MQTTJOB_TOPICS), 
eq(StringUtils.EMPTY))).thenReturn(StringUtils.EMPTY,
                 topic1, topic2);
-
         final MqttSource source = new MqttSource();
 
         // assert

Reply via email to