This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 381e34e1f [INLONG-7419][Agent] Fix error of MQTT connector (#7420)
381e34e1f is described below
commit 381e34e1f2ba564297a9c6dfe89768f6e7848235
Author: Lizhen <[email protected]>
AuthorDate: Mon Feb 27 11:11:13 2023 +0800
[INLONG-7419][Agent] Fix error of MQTT connector (#7420)
---
.../java/org/apache/inlong/agent/pojo/JobProfileDto.java | 5 +++--
.../main/java/org/apache/inlong/agent/pojo/MqttJob.java | 6 ++++--
.../apache/inlong/agent/plugin/sources/MqttSource.java | 10 ++++++----
.../inlong/agent/plugin/sources/reader/MqttReader.java | 16 ++++++++++++----
.../inlong/agent/plugin/sources/TestMqttSource.java | 1 -
5 files changed, 25 insertions(+), 13 deletions(-)
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
index 169392f02..57cae7730 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
@@ -343,13 +343,14 @@ public class JobProfileDto {
MqttJob mqttJob = new MqttJob();
mqttJob.setServerURI(config.getServerURI());
- mqttJob.setUserName(config.getUserName());
+ mqttJob.setUserName(config.getUsername());
mqttJob.setPassword(config.getPassword());
+ mqttJob.setTopic(config.getTopic());
mqttJob.setConnectionTimeOut(config.getConnectionTimeOut());
mqttJob.setKeepAliveInterval(config.getKeepAliveInterval());
mqttJob.setQos(config.getQos());
mqttJob.setCleanSession(config.getCleanSession());
- mqttJob.setClientIdPrefix(config.getClientIdPrefix());
+ mqttJob.setClientIdPrefix(config.getClientId());
mqttJob.setQueueSize(config.getQueueSize());
mqttJob.setAutomaticReconnect(config.getAutomaticReconnect());
mqttJob.setMqttVersion(config.getMqttVersion());
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/MqttJob.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/MqttJob.java
index edf7a07aa..013b04a86 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/MqttJob.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/MqttJob.java
@@ -28,6 +28,7 @@ public class MqttJob {
private String serverURI;
private String userName;
private String password;
+ private String topic;
private String connectionTimeOut;
private String keepAliveInterval;
private String qos;
@@ -41,13 +42,14 @@ public class MqttJob {
public static class MqttJobConfig {
private String serverURI;
- private String userName;
+ private String username;
private String password;
+ private String topic;
private String connectionTimeOut;
private String keepAliveInterval;
private String qos;
private String cleanSession;
- private String clientIdPrefix;
+ private String clientId;
private String queueSize;
private String automaticReconnect;
private String mqttVersion;
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MqttSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MqttSource.java
index 1f4e1b586..814a5ccd4 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MqttSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MqttSource.java
@@ -41,12 +41,12 @@ public class MqttSource extends AbstractSource {
private static final String JOB_MQTTJOB_CLIENTID = "";
- public static final String JOB_MQTTJOB_TOPICS = "job.mqttJob.topics";
+ public static final String JOB_MQTTJOB_TOPICS = "job.mqttJob.topic";
public MqttSource() {
}
- private List<Reader> splitSqlJob(String topics) {
+ private List<Reader> splitSqlJob(String topics, String instanceId) {
if (StringUtils.isEmpty(topics)) {
return null;
}
@@ -54,7 +54,9 @@ public class MqttSource extends AbstractSource {
String[] topicList = topics.split(CommonConstants.COMMA);
if (Objects.nonNull(topicList)) {
Arrays.stream(topicList).forEach(topic -> {
- result.add(new MqttReader(topic));
+ MqttReader mqttReader = new MqttReader(topic);
+ mqttReader.setReadSource(instanceId);
+ result.add(mqttReader);
});
}
return result;
@@ -66,7 +68,7 @@ public class MqttSource extends AbstractSource {
String topics = conf.get(JOB_MQTTJOB_TOPICS, StringUtils.EMPTY);
List<Reader> readerList = null;
if (StringUtils.isNotEmpty(topics)) {
- readerList = splitSqlJob(topics);
+ readerList = splitSqlJob(topics, conf.getInstanceId());
}
if (CollectionUtils.isNotEmpty(readerList)) {
sourceMetric.sourceSuccessCount.incrementAndGet();
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/MqttReader.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/MqttReader.java
index 6a0b0d340..2f02a3732 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/MqttReader.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/MqttReader.java
@@ -20,6 +20,7 @@ package org.apache.inlong.agent.plugin.sources.reader;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.message.DefaultMessage;
+import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.Message;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
@@ -43,6 +44,7 @@ public class MqttReader extends AbstractReader {
public static final String JOB_MQTT_USERNAME = "job.mqttJob.userName";
public static final String JOB_MQTT_PASSWORD = "job.mqttJob.password";
public static final String JOB_MQTT_SERVER_URI = "job.mqttJob.serverURI";
+ public static final String JOB_MQTT_TOPIC = "job.mqttJob.topic";
public static final String JOB_MQTT_CONNECTION_TIMEOUT =
"job.mqttJob.connectionTimeOut";
public static final String JOB_MQTT_KEEPALIVE_INTERVAL =
"job.mqttJob.keepAliveInterval";
public static final String JOB_MQTT_QOS = "job.mqttJob.qos";
@@ -89,6 +91,7 @@ public class MqttReader extends AbstractReader {
userName = jobConf.get(JOB_MQTT_USERNAME);
password = jobConf.get(JOB_MQTT_PASSWORD);
serverURI = jobConf.get(JOB_MQTT_SERVER_URI);
+ topic = jobConf.get(JOB_MQTT_TOPIC);
clientId = jobConf.get(JOB_MQTT_CLIENT_ID_PREFIX, "mqtt_client") + "_"
+ UUID.randomUUID();
cleanSession = jobConf.getBoolean(JOB_MQTT_CLEAN_SESSION, false);
automaticReconnect = jobConf.getBoolean(JOB_MQTT_AUTOMATIC_RECONNECT,
true);
@@ -116,8 +119,8 @@ public class MqttReader extends AbstractReader {
@Override
public void connectionLost(Throwable cause) {
- LOGGER.info("the mqtt connection is lost, try to
reconnect. jobId:{},serverURI:{},clientId:{}",
- instanceId, serverURI, clientId);
+ LOGGER.error("the mqtt jobId:{}, serverURI:{},
connection lost, {} ", instanceId,
+ serverURI, cause);
reconnect();
}
@@ -130,7 +133,8 @@ public class MqttReader extends AbstractReader {
byte[] recordValue = message.getPayload();
mqttMessagesQueue.put(new DefaultMessage(recordValue,
headerMap));
- LOGGER.debug("the mqtt receive message: {}", new
String(recordValue));
+ AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS,
inlongGroupId, inlongStreamId,
+ System.currentTimeMillis(), 1,
recordValue.length);
readerMetric.pluginReadSuccessCount.incrementAndGet();
readerMetric.pluginReadCount.incrementAndGet();
@@ -141,7 +145,7 @@ public class MqttReader extends AbstractReader {
}
});
client.connect(options);
- client.subscribe(topic, 1);
+ client.subscribe(topic, qos);
}
LOGGER.info("the mqtt subscribe topic is [{}], qos is [{}]",
topic, qos);
} catch (Exception e) {
@@ -227,6 +231,10 @@ public class MqttReader extends AbstractReader {
}
}
+ public void setReadSource(String instanceId) {
+ this.instanceId = instanceId;
+ }
+
@Override
public void destroy() {
synchronized (this) {
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMqttSource.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMqttSource.java
index 083bdfdb3..5a5faa698 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMqttSource.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMqttSource.java
@@ -92,7 +92,6 @@ public class TestMqttSource {
when(jobProfile.get(eq(CommonConstants.PROXY_INLONG_STREAM_ID),
anyString())).thenReturn("test_stream");
when(jobProfile.get(eq(MqttSource.JOB_MQTTJOB_TOPICS),
eq(StringUtils.EMPTY))).thenReturn(StringUtils.EMPTY,
topic1, topic2);
-
final MqttSource source = new MqttSource();
// assert