This is an automated email from the ASF dual-hosted git repository.
aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 3822ed2254 [INLONG-10286][Agent] Update the MQTT Source (#10727)
3822ed2254 is described below
commit 3822ed22542f34b41e8d0404eac890e710d3a9aa
Author: 马浩天 <[email protected]>
AuthorDate: Tue Jul 30 18:59:11 2024 +0800
[INLONG-10286][Agent] Update the MQTT Source (#10727)
---
.../inlong/agent/constant/TaskConstants.java | 14 ++
.../apache/inlong/agent/pojo/TaskProfileDto.java | 2 +
.../inlong/agent/plugin/instance/MqttInstance.java | 29 ++++
.../inlong/agent/plugin/sources/MqttSource.java | 185 ++++++++++++++-------
.../agent/plugin/sources/reader/MqttReader.java | 39 ++---
.../apache/inlong/agent/plugin/task/MqttTask.java | 103 ++++++++++++
.../agent/plugin/sources/TestMqttConnect.java | 14 +-
.../agent/plugin/sources/TestMqttReader.java | 13 +-
.../agent/plugin/sources/TestMqttSource.java | 3 +-
9 files changed, 306 insertions(+), 96 deletions(-)
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
index 1607742556..4cd6ac56ed 100755
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
@@ -139,6 +139,20 @@ public class TaskConstants extends CommonConstants {
public static final String TASK_POSTGRES_PLUGIN_NAME =
"task.postgreSQLTask.pluginName";
public static final String TASK_POSTGRES_SNAPSHOT_MODE =
"task.postgreSQLTask.snapshotMode";
+ // MQTT
+ public static final String TASK_MQTT_USERNAME = "task.mqttTask.userName";
+ public static final String TASK_MQTT_PASSWORD = "task.mqttTask.password";
+ public static final String TASK_MQTT_SERVER_URI =
"task.mqttTask.serverURI";
+ public static final String TASK_MQTT_TOPIC = "task.mqttTask.topic";
+ public static final String TASK_MQTT_CONNECTION_TIMEOUT =
"task.mqttTask.connectionTimeOut";
+ public static final String TASK_MQTT_KEEPALIVE_INTERVAL =
"task.mqttTask.keepAliveInterval";
+ public static final String TASK_MQTT_QOS = "task.mqttTask.qos";
+ public static final String TASK_MQTT_CLEAN_SESSION =
"task.mqttTask.cleanSession";
+ public static final String TASK_MQTT_CLIENT_ID_PREFIX =
"task.mqttTask.clientIdPrefix";
+ public static final String TASK_MQTT_QUEUE_SIZE =
"task.mqttTask.queueSize";
+ public static final String TASK_MQTT_AUTOMATIC_RECONNECT =
"task.mqttTask.automaticReconnect";
+ public static final String TASK_MQTT_VERSION = "task.mqttTask.mqttVersion";
+
public static final String TASK_STATE = "task.state";
public static final String INSTANCE_STATE = "instance.state";
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
index 1bd806254b..cc6cfe8244 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
@@ -53,6 +53,7 @@ public class TaskProfileDto {
public static final String DEFAULT_PULSAR_TASK =
"org.apache.inlong.agent.plugin.task.PulsarTask";
public static final String DEFAULT_MONGODB_TASK =
"org.apache.inlong.agent.plugin.task.MongoDBTask";
public static final String DEFAULT_POSTGRESQL_TASK =
"org.apache.inlong.agent.plugin.task.PostgreSQLTask";
+ public static final String DEFAULT_MQTT_TASK =
"org.apache.inlong.agent.plugin.task.MqttTask";
public static final String DEFAULT_CHANNEL =
"org.apache.inlong.agent.plugin.channel.MemoryChannel";
public static final String MANAGER_JOB = "MANAGER_JOB";
public static final String DEFAULT_DATA_PROXY_SINK =
"org.apache.inlong.agent.plugin.sinks.ProxySink";
@@ -513,6 +514,7 @@ public class TaskProfileDto {
profileDto.setTask(task);
break;
case MQTT:
+ task.setTaskClass(DEFAULT_MQTT_TASK);
MqttTask mqttTask = getMqttTask(dataConfig);
task.setMqttTask(mqttTask);
task.setSource(MQTT_SOURCE);
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/MqttInstance.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/MqttInstance.java
new file mode 100644
index 0000000000..ec4067f4e1
--- /dev/null
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/MqttInstance.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.instance;
+
+import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.constant.TaskConstants;
+
+public class MqttInstance extends CommonInstance {
+
+ @Override
+ public void setInodeInfo(InstanceProfile profile) {
+ profile.set(TaskConstants.INODE_INFO, "");
+ }
+}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MqttSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MqttSource.java
index a1c4af9be7..144a1e6cc4 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MqttSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MqttSource.java
@@ -18,96 +18,171 @@
package org.apache.inlong.agent.plugin.sources;
import org.apache.inlong.agent.conf.InstanceProfile;
-import org.apache.inlong.agent.conf.TaskProfile;
-import org.apache.inlong.agent.constant.CommonConstants;
+import org.apache.inlong.agent.constant.TaskConstants;
+import org.apache.inlong.agent.except.FileException;
+import org.apache.inlong.agent.message.DefaultMessage;
import org.apache.inlong.agent.plugin.Message;
-import org.apache.inlong.agent.plugin.file.Reader;
import org.apache.inlong.agent.plugin.sources.file.AbstractSource;
-import org.apache.inlong.agent.plugin.sources.reader.MqttReader;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
-import java.util.Objects;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
public class MqttSource extends AbstractSource {
private static final Logger LOGGER =
LoggerFactory.getLogger(MqttSource.class);
- private static final String JOB_MQTTJOB_PARAM_PREFIX = "job.mqttJob.";
+ private MqttClient client;
+ private LinkedBlockingQueue<DefaultMessage> mqttMessagesQueue;
+ private String serverURI;
- private static final String JOB_MQTTJOB_SERVERURI = "";
+ private String topic;
- private static final String JOB_MQTTJOB_CLIENTID = "";
+ private int qos;
- public static final String JOB_MQTTJOB_TOPICS = "job.mqttJob.topic";
+ private String clientId;
+
+ MqttConnectOptions options;
public MqttSource() {
}
- private List<Reader> splitSqlJob(String topics, String instanceId) {
- if (StringUtils.isEmpty(topics)) {
- return null;
- }
- final List<Reader> result = new ArrayList<>();
- String[] topicList = topics.split(CommonConstants.COMMA);
- if (Objects.nonNull(topicList)) {
- Arrays.stream(topicList).forEach(topic -> {
- MqttReader mqttReader = new MqttReader(topic);
- mqttReader.setReadSource(instanceId);
- result.add(mqttReader);
- });
- }
- return result;
+ @Override
+ protected String getThreadName() {
+ return "mqtt-source-" + taskId + "-" + instanceId;
}
@Override
- public List<Reader> split(TaskProfile conf) {
- String topics = conf.get(JOB_MQTTJOB_TOPICS, StringUtils.EMPTY);
- List<Reader> readerList = null;
- if (StringUtils.isNotEmpty(topics)) {
- }
- if (CollectionUtils.isNotEmpty(readerList)) {
- sourceMetric.sourceSuccessCount.incrementAndGet();
- } else {
- sourceMetric.sourceFailCount.incrementAndGet();
+ protected void initSource(InstanceProfile profile) {
+ try {
+ LOGGER.info("MqttSource init: {}", profile.toJsonStr());
+ mqttMessagesQueue = new
LinkedBlockingQueue<>(profile.getInt(TaskConstants.TASK_MQTT_QUEUE_SIZE, 1000));
+ serverURI = profile.get(TaskConstants.TASK_MQTT_SERVER_URI);
+ instanceId = profile.getInstanceId();
+ topic = profile.get(TaskConstants.TASK_MQTT_TOPIC);
+ qos = profile.getInt(TaskConstants.TASK_MQTT_QOS, 1);
+ clientId = profile.get(TaskConstants.TASK_MQTT_CLIENT_ID_PREFIX,
"mqtt_client") + "_" + UUID.randomUUID();
+ initConnectOptions(profile);
+ mqttConnect();
+ } catch (Exception e) {
+ stopRunning();
+ throw new FileException("error init stream for {}" + topic, e);
}
- return readerList;
}
- @Override
- protected String getThreadName() {
- return null;
+ private void initConnectOptions(InstanceProfile profile) {
+ options = new MqttConnectOptions();
+
options.setCleanSession(profile.getBoolean(TaskConstants.TASK_MQTT_CLEAN_SESSION,
false));
+
options.setConnectionTimeout(profile.getInt(TaskConstants.TASK_MQTT_CONNECTION_TIMEOUT,
10));
+
options.setKeepAliveInterval(profile.getInt(TaskConstants.TASK_MQTT_KEEPALIVE_INTERVAL,
20));
+ options.setUserName(profile.get(TaskConstants.TASK_MQTT_USERNAME, ""));
+ options.setPassword(profile.get(TaskConstants.TASK_MQTT_PASSWORD,
"").toCharArray());
+
options.setAutomaticReconnect(profile.getBoolean(TaskConstants.TASK_MQTT_AUTOMATIC_RECONNECT,
true));
+ options.setMqttVersion(
+ profile.getInt(TaskConstants.TASK_MQTT_VERSION,
MqttConnectOptions.MQTT_VERSION_DEFAULT));
}
- @Override
- protected void initSource(InstanceProfile profile) {
+ private void mqttConnect() {
+ try {
+ client = new MqttClient(serverURI, clientId, new
MemoryPersistence());
+ client.setCallback(new MqttCallback() {
+
+ @Override
+ public void connectionLost(Throwable cause) {
+ LOGGER.error("the mqtt jobId:{}, serverURI:{}, connection
lost, {} ", instanceId,
+ serverURI, cause);
+ reconnect();
+ }
+
+ @Override
+ public void messageArrived(String topic, MqttMessage message)
throws Exception {
+ Map<String, String> headerMap = new HashMap<>();
+ headerMap.put("record.topic", topic);
+ headerMap.put("record.messageId",
String.valueOf(message.getId()));
+ headerMap.put("record.qos",
String.valueOf(message.getQos()));
+ byte[] recordValue = message.getPayload();
+ mqttMessagesQueue.offer(new DefaultMessage(recordValue,
headerMap), 1, TimeUnit.SECONDS);
+
+ }
+
+ @Override
+ public void deliveryComplete(IMqttDeliveryToken token) {
+ }
+ });
+ client.connect(options);
+ client.subscribe(topic, qos);
+ LOGGER.info("the mqtt subscribe topic is [{}], qos is [{}]",
topic, qos);
+ } catch (Exception e) {
+ LOGGER.error("init mqtt client error {}.
jobId:{},serverURI:{},clientId:{}", e, instanceId, serverURI,
+ clientId);
+ }
+ }
+ private void reconnect() {
+ if (!client.isConnected()) {
+ try {
+ client.connect(options);
+ LOGGER.info("the mqtt client reconnect success. jobId:{},
serverURI:{}, clientId:{}", instanceId,
+ serverURI, clientId);
+ } catch (Exception e) {
+ LOGGER.error("reconnect mqtt client error {}. jobId:{},
serverURI:{}, clientId:{}", e, instanceId,
+ serverURI, clientId);
+ }
+ }
+ }
+
+ private void disconnect() {
+ try {
+ client.disconnect();
+ } catch (MqttException e) {
+ LOGGER.error("disconnect mqtt client error {}.
jobId:{},serverURI:{},clientId:{}", e, instanceId, serverURI,
+ clientId);
+ }
}
@Override
protected void printCurrentState() {
-
+ LOGGER.info("mqtt topic is {}", topic);
}
@Override
protected boolean doPrepareToRead() {
- return false;
+ return true;
}
@Override
protected List<SourceData> readFromSource() {
- return null;
- }
-
- @Override
- public Message read() {
- return null;
+ List<SourceData> dataList = new ArrayList<>();
+ try {
+ int size = 0;
+ while (size < BATCH_READ_LINE_TOTAL_LEN) {
+ Message msg = mqttMessagesQueue.poll(1, TimeUnit.SECONDS);
+ if (msg != null) {
+ SourceData sourceData = new SourceData(msg.getBody(),
"0L");
+ size += sourceData.getData().length;
+ dataList.add(sourceData);
+ } else {
+ break;
+ }
+ }
+ } catch (InterruptedException e) {
+ LOGGER.error("poll {} data from mqtt queue interrupted.",
instanceId);
+ }
+ return dataList;
}
@Override
@@ -117,16 +192,14 @@ public class MqttSource extends AbstractSource {
@Override
protected void releaseSource() {
-
- }
-
- @Override
- public boolean sourceFinish() {
- return false;
+ LOGGER.info("release mqtt source");
+ if (client.isConnected()) {
+ disconnect();
+ }
}
@Override
public boolean sourceExist() {
- return false;
+ return true;
}
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/MqttReader.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/MqttReader.java
index 253328bdce..45660748c2 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/MqttReader.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/MqttReader.java
@@ -18,6 +18,7 @@
package org.apache.inlong.agent.plugin.sources.reader;
import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.constant.TaskConstants;
import org.apache.inlong.agent.message.DefaultMessage;
import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.Message;
@@ -43,19 +44,6 @@ public class MqttReader extends AbstractReader {
private static final Logger LOGGER =
LoggerFactory.getLogger(MqttReader.class);
- public static final String JOB_MQTT_USERNAME = "job.mqttJob.userName";
- public static final String JOB_MQTT_PASSWORD = "job.mqttJob.password";
- public static final String JOB_MQTT_SERVER_URI = "job.mqttJob.serverURI";
- public static final String JOB_MQTT_TOPIC = "job.mqttJob.topic";
- public static final String JOB_MQTT_CONNECTION_TIMEOUT =
"job.mqttJob.connectionTimeOut";
- public static final String JOB_MQTT_KEEPALIVE_INTERVAL =
"job.mqttJob.keepAliveInterval";
- public static final String JOB_MQTT_QOS = "job.mqttJob.qos";
- public static final String JOB_MQTT_CLEAN_SESSION =
"job.mqttJob.cleanSession";
- public static final String JOB_MQTT_CLIENT_ID_PREFIX =
"job.mqttJob.clientIdPrefix";
- public static final String JOB_MQTT_QUEUE_SIZE = "job.mqttJob.queueSize";
- public static final String JOB_MQTT_AUTOMATIC_RECONNECT =
"job.mqttJob.automaticReconnect";
- public static final String JOB_MQTT_VERSION = "job.mqttJob.mqttVersion";
-
private boolean finished = false;
private boolean destroyed = false;
@@ -88,22 +76,20 @@ public class MqttReader extends AbstractReader {
* @param jobConf
*/
private void setGlobalParamsValue(InstanceProfile jobConf) {
- mqttMessagesQueue = new
LinkedBlockingQueue<>(jobConf.getInt(JOB_MQTT_QUEUE_SIZE, 1000));
+ mqttMessagesQueue = new
LinkedBlockingQueue<>(jobConf.getInt(TaskConstants.TASK_MQTT_QUEUE_SIZE, 1000));
instanceId = jobConf.getInstanceId();
- userName = jobConf.get(JOB_MQTT_USERNAME);
- password = jobConf.get(JOB_MQTT_PASSWORD);
- serverURI = jobConf.get(JOB_MQTT_SERVER_URI);
- topic = jobConf.get(JOB_MQTT_TOPIC);
- clientId = jobConf.get(JOB_MQTT_CLIENT_ID_PREFIX, "mqtt_client") + "_"
+ UUID.randomUUID();
- cleanSession = jobConf.getBoolean(JOB_MQTT_CLEAN_SESSION, false);
- automaticReconnect = jobConf.getBoolean(JOB_MQTT_AUTOMATIC_RECONNECT,
true);
- qos = jobConf.getInt(JOB_MQTT_QOS, 1);
- mqttVersion = jobConf.getInt(JOB_MQTT_VERSION,
MqttConnectOptions.MQTT_VERSION_DEFAULT);
-
+ userName = jobConf.get(TaskConstants.TASK_MQTT_USERNAME);
+ password = jobConf.get(TaskConstants.TASK_MQTT_PASSWORD);
+ serverURI = jobConf.get(TaskConstants.TASK_MQTT_SERVER_URI);
+ clientId = jobConf.get(TaskConstants.TASK_MQTT_CLIENT_ID_PREFIX,
"mqtt_client") + "_" + UUID.randomUUID();
+ cleanSession =
jobConf.getBoolean(TaskConstants.TASK_MQTT_CLEAN_SESSION, false);
+ automaticReconnect =
jobConf.getBoolean(TaskConstants.TASK_MQTT_AUTOMATIC_RECONNECT, true);
+ qos = jobConf.getInt(TaskConstants.TASK_MQTT_QOS, 1);
+ mqttVersion = jobConf.getInt(TaskConstants.TASK_MQTT_VERSION,
MqttConnectOptions.MQTT_VERSION_DEFAULT);
options = new MqttConnectOptions();
options.setCleanSession(cleanSession);
-
options.setConnectionTimeout(jobConf.getInt(JOB_MQTT_CONNECTION_TIMEOUT, 10));
-
options.setKeepAliveInterval(jobConf.getInt(JOB_MQTT_KEEPALIVE_INTERVAL, 20));
+
options.setConnectionTimeout(jobConf.getInt(TaskConstants.TASK_MQTT_CONNECTION_TIMEOUT,
10));
+
options.setKeepAliveInterval(jobConf.getInt(TaskConstants.TASK_MQTT_KEEPALIVE_INTERVAL,
20));
options.setUserName(userName);
options.setPassword(password.toCharArray());
options.setAutomaticReconnect(automaticReconnect);
@@ -114,6 +100,7 @@ public class MqttReader extends AbstractReader {
* connect to MQTT Broker
*/
private void connect() {
+
try {
synchronized (MqttReader.class) {
client = new MqttClient(serverURI, clientId, new
MemoryPersistence());
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/MqttTask.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/MqttTask.java
new file mode 100644
index 0000000000..1d7d9a3dc2
--- /dev/null
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/MqttTask.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.task;
+
+import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.constant.CycleUnitType;
+import org.apache.inlong.agent.constant.TaskConstants;
+import org.apache.inlong.agent.utils.AgentUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class MqttTask extends AbstractTask {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(MqttTask.class);
+
+ private String topic;
+
+ private int instanceLimit = DEFAULT_INSTANCE_LIMIT;
+
+ private AtomicBoolean isAdded = new AtomicBoolean(false);
+
+ public static final String DEFAULT_MQTT_INSTANCE =
"org.apache.inlong.agent.plugin.instance.MqttInstance";
+
+ private final DateTimeFormatter dateTimeFormatter =
DateTimeFormatter.ofPattern("yyyyMMddHH");
+
+ @Override
+ public boolean isProfileValid(TaskProfile profile) {
+ if (!profile.allRequiredKeyExist()) {
+ LOGGER.info("task profile needs all required key");
+ return false;
+ }
+ if (!profile.hasKey(TaskConstants.TASK_MQTT_TOPIC)) {
+ LOGGER.info("task profile needs topic");
+ return false;
+ }
+ if (!profile.hasKey(TaskConstants.TASK_MQTT_SERVER_URI)) {
+ LOGGER.info("task profile needs serverUri");
+ return false;
+ }
+ if (!profile.hasKey(TaskConstants.TASK_MQTT_USERNAME)) {
+ LOGGER.info("task profile needs username");
+ return false;
+ }
+ if (!profile.hasKey(TaskConstants.TASK_MQTT_PASSWORD)) {
+ LOGGER.info("task profile needs password");
+ return false;
+ }
+ return true;
+ }
+
+ protected void setInstanceLimit(int instanceLimit) {
+ this.instanceLimit = instanceLimit;
+ }
+
+ @Override
+ protected int getInstanceLimit() {
+ return instanceLimit;
+ }
+
+ @Override
+ protected void initTask() {
+ LOGGER.info("Mqtt commonInit: {}", taskProfile.toJsonStr());
+ topic = taskProfile.get(TaskConstants.TASK_MQTT_TOPIC);
+ }
+
+ @Override
+ protected List<InstanceProfile> getNewInstanceList() {
+ List<InstanceProfile> list = new ArrayList<>();
+ if (isAdded.get()) {
+ return list;
+ }
+ String dataTime = LocalDateTime.now().format(dateTimeFormatter);
+ InstanceProfile instanceProfile =
taskProfile.createInstanceProfile(DEFAULT_MQTT_INSTANCE, topic,
+ CycleUnitType.HOUR, dataTime, AgentUtils.getCurrentTime());
+ LOGGER.info("taskProfile.createInstanceProfile(mqtt): {}",
instanceProfile.toJsonStr());
+ list.add(instanceProfile);
+ isAdded.set(true);
+ return list;
+ }
+}
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMqttConnect.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMqttConnect.java
index 89aa196ac2..8ca9785e6d 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMqttConnect.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMqttConnect.java
@@ -18,9 +18,9 @@
package org.apache.inlong.agent.plugin.sources;
import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.constant.TaskConstants;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.file.Reader;
-import org.apache.inlong.agent.plugin.sources.reader.MqttReader;
import org.apache.inlong.agent.utils.AgentUtils;
import org.junit.Ignore;
@@ -45,12 +45,12 @@ public class TestMqttConnect {
@Ignore
public void testMqttReader() throws Exception {
TaskProfile jobProfile = TaskProfile.parseJsonStr("{}");
- jobProfile.set(MqttReader.JOB_MQTT_SERVER_URI,
"tcp://broker.hivemq.com:1883");
- jobProfile.set(MqttReader.JOB_MQTT_CLIENT_ID_PREFIX, "mqtt_client");
- jobProfile.set(MqttReader.JOB_MQTT_USERNAME, "test");
- jobProfile.set(MqttReader.JOB_MQTT_PASSWORD, "test");
- jobProfile.set(MqttSource.JOB_MQTTJOB_TOPICS,
"testtopic/mqtt/p1/ebr/delivered,testtopic/NARTU2");
- jobProfile.set(MqttReader.JOB_MQTT_QOS, "0");
+ jobProfile.set(TaskConstants.TASK_MQTT_SERVER_URI,
"tcp://broker.hivemq.com:1883");
+ jobProfile.set(TaskConstants.TASK_MQTT_CLIENT_ID_PREFIX,
"mqtt_client");
+ jobProfile.set(TaskConstants.TASK_MQTT_USERNAME, "test");
+ jobProfile.set(TaskConstants.TASK_MQTT_PASSWORD, "test");
+ jobProfile.set(TaskConstants.TASK_MQTT_TOPIC,
"testtopic/mqtt/p1/ebr/delivered,testtopic/NARTU2");
+ jobProfile.set(TaskConstants.TASK_MQTT_QOS, "0");
jobProfile.set("job.instance.id", "_1");
final MqttSource source = new MqttSource();
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMqttReader.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMqttReader.java
index 2caadd5c9e..a2652869ba 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMqttReader.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMqttReader.java
@@ -19,6 +19,7 @@ package org.apache.inlong.agent.plugin.sources;
import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.constant.CommonConstants;
+import org.apache.inlong.agent.constant.TaskConstants;
import org.apache.inlong.agent.message.DefaultMessage;
import org.apache.inlong.agent.metrics.AgentMetricItem;
import org.apache.inlong.agent.metrics.AgentMetricItemSet;
@@ -106,13 +107,13 @@ public class TestMqttReader {
when(jobProfile.get(eq(CommonConstants.PROXY_INLONG_GROUP_ID),
anyString())).thenReturn(groupId);
when(jobProfile.get(eq(CommonConstants.PROXY_INLONG_STREAM_ID),
anyString())).thenReturn(streamId);
-
when(jobProfile.get(eq(MqttReader.JOB_MQTT_USERNAME))).thenReturn(username);
-
when(jobProfile.get(eq(MqttReader.JOB_MQTT_PASSWORD))).thenReturn(password);
-
when(jobProfile.get(eq(MqttReader.JOB_MQTT_SERVER_URI))).thenReturn(serverURI);
- when(jobProfile.get(eq(MqttReader.JOB_MQTT_QOS))).thenReturn(qos);
-
when(jobProfile.get(eq(MqttReader.JOB_MQTT_CLIENT_ID_PREFIX))).thenReturn(clientIdPrefix);
+
when(jobProfile.get(eq(TaskConstants.TASK_MQTT_USERNAME))).thenReturn(username);
+
when(jobProfile.get(eq(TaskConstants.TASK_MQTT_PASSWORD))).thenReturn(password);
+
when(jobProfile.get(eq(TaskConstants.TASK_MQTT_SERVER_URI))).thenReturn(serverURI);
+ when(jobProfile.get(eq(TaskConstants.TASK_MQTT_QOS))).thenReturn(qos);
+
when(jobProfile.get(eq(TaskConstants.TASK_MQTT_CLIENT_ID_PREFIX))).thenReturn(clientIdPrefix);
when(jobProfile.getInstanceId()).thenReturn(INSTANCE_ID);
- when(jobProfile.getInt(eq(MqttReader.JOB_MQTT_QUEUE_SIZE),
eq(1000))).thenReturn(1000);
+ when(jobProfile.getInt(eq(TaskConstants.TASK_MQTT_QUEUE_SIZE),
eq(1000))).thenReturn(1000);
// mock MqttClient
whenNew(MqttClient.class).withArguments(anyString(), anyString(),
any(MemoryPersistence.class))
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMqttSource.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMqttSource.java
index a956c07aac..aeb178cb33 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMqttSource.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMqttSource.java
@@ -19,6 +19,7 @@ package org.apache.inlong.agent.plugin.sources;
import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.constant.CommonConstants;
+import org.apache.inlong.agent.constant.TaskConstants;
import org.apache.inlong.agent.metrics.AgentMetricItem;
import org.apache.inlong.agent.metrics.AgentMetricItemSet;
import org.apache.inlong.common.metric.MetricItem;
@@ -90,7 +91,7 @@ public class TestMqttSource {
// build mock
when(jobProfile.get(eq(CommonConstants.PROXY_INLONG_GROUP_ID),
anyString())).thenReturn("test_group");
when(jobProfile.get(eq(CommonConstants.PROXY_INLONG_STREAM_ID),
anyString())).thenReturn("test_stream");
- when(jobProfile.get(eq(MqttSource.JOB_MQTTJOB_TOPICS),
eq(StringUtils.EMPTY))).thenReturn(StringUtils.EMPTY,
+ when(jobProfile.get(eq(TaskConstants.TASK_MQTT_TOPIC),
eq(StringUtils.EMPTY))).thenReturn(StringUtils.EMPTY,
topic1, topic2);
final MqttSource source = new MqttSource();