This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 03e8e51b5a0 Pipe: Removed the MQTT source (#16405)
03e8e51b5a0 is described below
commit 03e8e51b5a094abe6a477eb2766c5bf27bdd6756
Author: Caideyipi <[email protected]>
AuthorDate: Mon Sep 15 09:44:37 2025 +0800
Pipe: Removed the MQTT source (#16405)
* removal
* fix
* fix
---
.../PipeDataRegionSourceConstructor.java | 4 -
.../db/pipe/source/mqtt/MQTTPublishHandler.java | 320 ---------------------
.../iotdb/db/pipe/source/mqtt/MQTTSource.java | 264 -----------------
.../agent/plugin/builtin/BuiltinPipePlugin.java | 3 -
.../plugin/builtin/source/mqtt/MQTTSource.java | 68 -----
.../pipe/config/constant/PipeSourceConstant.java | 28 --
6 files changed, 687 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionSourceConstructor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionSourceConstructor.java
index d67ef97913c..42caccb3e5f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionSourceConstructor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionSourceConstructor.java
@@ -24,7 +24,6 @@ import
org.apache.iotdb.commons.pipe.agent.plugin.builtin.source.donothing.DoNot
import
org.apache.iotdb.commons.pipe.agent.plugin.constructor.PipeSourceConstructor;
import
org.apache.iotdb.commons.pipe.agent.plugin.meta.DataNodePipePluginMetaKeeper;
import org.apache.iotdb.db.pipe.source.dataregion.IoTDBDataRegionSource;
-import org.apache.iotdb.db.pipe.source.mqtt.MQTTSource;
class PipeDataRegionSourceConstructor extends PipeSourceConstructor {
@@ -43,8 +42,5 @@ class PipeDataRegionSourceConstructor extends
PipeSourceConstructor {
BuiltinPipePlugin.DO_NOTHING_SOURCE.getPipePluginName(),
DoNothingSource::new);
pluginConstructors.put(
BuiltinPipePlugin.IOTDB_SOURCE.getPipePluginName(),
IoTDBDataRegionSource::new);
-
-
pluginConstructors.put(BuiltinPipePlugin.MQTT_EXTRACTOR.getPipePluginName(),
MQTTSource::new);
- pluginConstructors.put(BuiltinPipePlugin.MQTT_SOURCE.getPipePluginName(),
MQTTSource::new);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/mqtt/MQTTPublishHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/mqtt/MQTTPublishHandler.java
deleted file mode 100644
index f1eb102b32e..00000000000
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/mqtt/MQTTPublishHandler.java
+++ /dev/null
@@ -1,320 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.source.mqtt;
-
-import org.apache.iotdb.commons.conf.IoTDBConstant.ClientVersion;
-import org.apache.iotdb.commons.exception.IllegalPathException;
-import
org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
-import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
-import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskSourceRuntimeEnvironment;
-import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
-import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
-import
org.apache.iotdb.db.pipe.event.common.statement.PipeStatementInsertionEvent;
-import org.apache.iotdb.db.protocol.mqtt.Message;
-import org.apache.iotdb.db.protocol.mqtt.PayloadFormatter;
-import org.apache.iotdb.db.protocol.mqtt.TableMessage;
-import org.apache.iotdb.db.protocol.mqtt.TreeMessage;
-import org.apache.iotdb.db.protocol.session.IClientSession;
-import org.apache.iotdb.db.protocol.session.MqttClientSession;
-import org.apache.iotdb.db.protocol.session.SessionManager;
-import org.apache.iotdb.db.queryengine.plan.Coordinator;
-import
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeDevicePathCache;
-import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;
-import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
-import org.apache.iotdb.db.utils.CommonUtils;
-import org.apache.iotdb.db.utils.TimestampPrecisionUtils;
-import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
-
-import io.moquette.interception.AbstractInterceptHandler;
-import io.moquette.interception.messages.InterceptConnectMessage;
-import io.moquette.interception.messages.InterceptDisconnectMessage;
-import io.moquette.interception.messages.InterceptPublishMessage;
-import io.netty.buffer.ByteBuf;
-import io.netty.handler.codec.mqtt.MqttQoS;
-import org.apache.tsfile.enums.TSDataType;
-import org.apache.tsfile.utils.BitMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.time.ZoneId;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-/** PublishHandler handle the messages from MQTT clients. */
-public class MQTTPublishHandler extends AbstractInterceptHandler {
-
- private static final Logger LOGGER =
LoggerFactory.getLogger(MQTTPublishHandler.class);
-
- private final SessionManager sessionManager = SessionManager.getInstance();
-
- private final ConcurrentHashMap<String, MqttClientSession>
clientIdToSessionMap =
- new ConcurrentHashMap<>();
- private final PayloadFormatter payloadFormat;
- private final boolean useTableInsert;
- private final UnboundedBlockingPendingQueue<EnrichedEvent> pendingQueue;
- private final String pipeName;
- private final long creationTime;
- private final PipeTaskMeta pipeTaskMeta;
-
- public MQTTPublishHandler(
- final PayloadFormatter payloadFormat,
- final PipeTaskSourceRuntimeEnvironment environment,
- final UnboundedBlockingPendingQueue<EnrichedEvent> pendingQueue) {
- this.payloadFormat = payloadFormat;
- useTableInsert =
PayloadFormatter.TABLE_TYPE.equals(this.payloadFormat.getType());
- pipeName = environment.getPipeName();
- creationTime = environment.getCreationTime();
- pipeTaskMeta = environment.getPipeTaskMeta();
- this.pendingQueue = pendingQueue;
- }
-
- @Override
- public String getID() {
- return "mqtt-source-broker-listener";
- }
-
- @Override
- public void onConnect(InterceptConnectMessage msg) {
- if (msg.getClientID() == null || msg.getClientID().trim().isEmpty()) {
- LOGGER.error(
- "Connection refused: client_id is missing or empty. A valid
client_id is required to establish a connection.");
- }
- if (!clientIdToSessionMap.containsKey(msg.getClientID())) {
- final MqttClientSession session = new
MqttClientSession(msg.getClientID());
- sessionManager.login(
- session,
- msg.getUsername(),
- new String(msg.getPassword()),
- ZoneId.systemDefault().toString(),
- TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3,
- ClientVersion.V_1_0,
- useTableInsert ? IClientSession.SqlDialect.TABLE :
IClientSession.SqlDialect.TREE);
- sessionManager.registerSessionForMqtt(session);
- clientIdToSessionMap.put(msg.getClientID(), session);
- }
- }
-
- @Override
- public void onDisconnect(InterceptDisconnectMessage msg) {
- final MqttClientSession session =
clientIdToSessionMap.remove(msg.getClientID());
- if (null != session) {
- sessionManager.removeCurrSessionForMqtt(session);
- sessionManager.closeSession(session,
Coordinator.getInstance()::cleanupQueryExecution);
- }
- }
-
- @Override
- public void onPublish(InterceptPublishMessage msg) {
- try {
- final String clientId = msg.getClientID();
- if (!clientIdToSessionMap.containsKey(clientId)) {
- return;
- }
- final MqttClientSession session =
clientIdToSessionMap.get(msg.getClientID());
- final ByteBuf payload = msg.getPayload();
- final String topic = msg.getTopicName();
- final String username = msg.getUsername();
- final MqttQoS qos = msg.getQos();
-
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug(
- "Receive publish message. clientId: {}, username: {}, qos: {},
topic: {}, payload: {}",
- clientId,
- username,
- qos,
- topic,
- payload);
- }
-
- final List<Message> messages = payloadFormat.format(topic, payload);
- if (messages == null) {
- return;
- }
-
- for (Message message : messages) {
- if (message == null) {
- continue;
- }
- if (useTableInsert) {
- extractTable((TableMessage) message, session);
- } else {
- extractTree((TreeMessage) message, session);
- }
- }
- } catch (Throwable t) {
- LOGGER.warn("onPublish execution exception, msg is {}, error is ", msg,
t);
- } finally {
- // release the payload of the message
- super.onPublish(msg);
- }
- }
-
- private void extractTable(final TableMessage message, final
MqttClientSession session) {
- try {
- TimestampPrecisionUtils.checkTimestampPrecision(message.getTimestamp());
- InsertTabletStatement insertTabletStatement =
constructInsertTabletStatement(message);
- session.setSqlDialect(IClientSession.SqlDialect.TABLE);
- final EnrichedEvent event =
- new PipeStatementInsertionEvent(
- pipeName,
- creationTime,
- pipeTaskMeta,
- null,
- null,
- session.getUsername(),
- true,
- true,
- message.getDatabase().toLowerCase(),
- insertTabletStatement);
- if (!event.increaseReferenceCount(MQTTPublishHandler.class.getName())) {
- LOGGER.warn("The reference count of the event {} cannot be increased,
skipping it.", event);
- return;
- }
- pendingQueue.waitedOffer(event);
- } catch (Exception e) {
- LOGGER.warn(
- "meet error when polling mqtt source message database {}, table {},
tags {}, attributes {}, fields {}, at time {}, because {}",
- message.getDatabase(),
- message.getTable(),
- message.getTagKeys(),
- message.getAttributeKeys(),
- message.getFields(),
- message.getTimestamp(),
- e.getMessage(),
- e);
- }
- }
-
- private InsertTabletStatement constructInsertTabletStatement(TableMessage
message)
- throws IllegalPathException {
- InsertTabletStatement statement = new InsertTabletStatement();
- statement.setDevicePath(
-
DataNodeDevicePathCache.getInstance().getPartialPath(message.getTable()));
- List<String> measurements =
- Stream.of(message.getFields(), message.getTagKeys(),
message.getAttributeKeys())
- .flatMap(List::stream)
- .collect(Collectors.toList());
- statement.setMeasurements(measurements.toArray(new String[0]));
- long[] timestamps = new long[] {message.getTimestamp()};
- statement.setTimes(timestamps);
- int columnSize = measurements.size();
- int rowSize = 1;
-
- BitMap[] bitMaps = new BitMap[columnSize];
- Object[] columns =
- Stream.of(message.getValues(), message.getTagValues(),
message.getAttributeValues())
- .flatMap(List::stream)
- .toArray(Object[]::new);
- statement.setColumns(columns);
- statement.setBitMaps(bitMaps);
- statement.setRowCount(rowSize);
- statement.setAligned(false);
- statement.setWriteToTable(true);
- TSDataType[] dataTypes = new TSDataType[measurements.size()];
- TsTableColumnCategory[] columnCategories = new
TsTableColumnCategory[measurements.size()];
- for (int i = 0; i < message.getFields().size(); i++) {
- dataTypes[i] = message.getDataTypes().get(i);
- columnCategories[i] = TsTableColumnCategory.FIELD;
- }
- for (int i = message.getFields().size();
- i < message.getFields().size() + message.getTagKeys().size();
- i++) {
- dataTypes[i] = TSDataType.STRING;
- columnCategories[i] = TsTableColumnCategory.TAG;
- }
- for (int i = message.getFields().size() + message.getTagKeys().size();
- i
- < message.getFields().size()
- + message.getTagKeys().size()
- + message.getAttributeKeys().size();
- i++) {
- dataTypes[i] = TSDataType.STRING;
- columnCategories[i] = TsTableColumnCategory.ATTRIBUTE;
- }
- statement.setDataTypes(dataTypes);
- statement.setColumnCategories(columnCategories);
-
- return statement;
- }
-
- private void extractTree(final TreeMessage message, final MqttClientSession
session) {
- try {
- InsertRowStatement statement = new InsertRowStatement();
- statement.setDevicePath(
-
DataNodeDevicePathCache.getInstance().getPartialPath(message.getDevice()));
- TimestampPrecisionUtils.checkTimestampPrecision(message.getTimestamp());
- statement.setTime(message.getTimestamp());
- statement.setMeasurements(message.getMeasurements().toArray(new
String[0]));
- if (message.getDataTypes() == null) {
- statement.setDataTypes(new
TSDataType[message.getMeasurements().size()]);
- statement.setValues(message.getValues().toArray(new Object[0]));
- statement.setNeedInferType(true);
- } else {
- List<TSDataType> dataTypes = message.getDataTypes();
- List<String> values = message.getValues();
- Object[] inferredValues = new Object[values.size()];
- for (int i = 0; i < values.size(); ++i) {
- inferredValues[i] =
- values.get(i) == null
- ? null
- : CommonUtils.parseValue(dataTypes.get(i), values.get(i));
- }
- statement.setDataTypes(dataTypes.toArray(new TSDataType[0]));
- statement.setValues(inferredValues);
- }
- statement.setAligned(false);
- final EnrichedEvent event =
- new PipeStatementInsertionEvent(
- pipeName,
- creationTime,
- pipeTaskMeta,
- null,
- null,
- session.getUsername(),
- true,
- false,
- message.getDevice(),
- statement);
- if (!event.increaseReferenceCount(MQTTPublishHandler.class.getName())) {
- LOGGER.warn("The reference count of the event {} cannot be increased,
skipping it.", event);
- return;
- }
- pendingQueue.waitedOffer(event);
- } catch (Exception e) {
- LOGGER.warn(
- "meet error when polling mqtt source device {}, measurements {}, at
time {}, because {}",
- message.getDevice(),
- message.getMeasurements(),
- message.getTimestamp(),
- e.getMessage(),
- e);
- }
- }
-
- @Override
- public void onSessionLoopError(Throwable throwable) {
- LOGGER.warn(
- "onSessionLoopError: {}",
- throwable.getMessage() == null ? "null" : throwable.getMessage(),
- throwable);
- }
-}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/mqtt/MQTTSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/mqtt/MQTTSource.java
deleted file mode 100644
index f4d289b0ef6..00000000000
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/mqtt/MQTTSource.java
+++ /dev/null
@@ -1,264 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.source.mqtt;
-
-import
org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
-import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
-import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
-import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
-import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskSourceRuntimeEnvironment;
-import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
-import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionEventCounter;
-import org.apache.iotdb.db.protocol.mqtt.BrokerAuthenticator;
-import org.apache.iotdb.db.protocol.mqtt.PayloadFormatManager;
-import org.apache.iotdb.db.protocol.mqtt.PayloadFormatter;
-import org.apache.iotdb.pipe.api.PipeExtractor;
-import org.apache.iotdb.pipe.api.annotation.TableModel;
-import org.apache.iotdb.pipe.api.annotation.TreeModel;
-import
org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
-import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
-import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
-import org.apache.iotdb.pipe.api.event.Event;
-import org.apache.iotdb.pipe.api.exception.PipeException;
-import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
-
-import io.moquette.BrokerConstants;
-import io.moquette.broker.Server;
-import io.moquette.broker.config.IConfig;
-import io.moquette.broker.config.MemoryConfig;
-import io.moquette.broker.security.IAuthenticator;
-import io.moquette.interception.InterceptHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.nio.channels.FileChannel;
-import java.nio.channels.FileLock;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * MQTTExtractor is an external Extractor that uses the MQTT protocol to
receive data. It starts an
- * MQTT broker and listens for incoming messages, which are then processed and
passed to the pending
- * queue.
- */
-@TreeModel
-@TableModel
-public class MQTTSource implements PipeExtractor {
-
- private static final Logger LOGGER =
LoggerFactory.getLogger(MQTTSource.class);
-
- protected String pipeName;
- protected long creationTime;
- protected PipeTaskMeta pipeTaskMeta;
- protected final UnboundedBlockingPendingQueue<EnrichedEvent> pendingQueue =
- new UnboundedBlockingPendingQueue<>(new PipeDataRegionEventCounter());
-
- protected PayloadFormatter payloadFormat;
- protected IConfig brokerConfig;
- protected List<InterceptHandler> handlers;
- protected IAuthenticator authenticator;
- private final Server server = new Server();
-
- protected final AtomicBoolean isClosed = new AtomicBoolean(false);
- private final AtomicBoolean hasBeenStarted = new AtomicBoolean(false);
-
- @Override
- public void validate(final PipeParameterValidator validator) throws
Exception {
- if (!validator
- .getParameters()
- .getBooleanOrDefault(
- Arrays.asList(
-
PipeSourceConstant.EXTERNAL_EXTRACTOR_SINGLE_INSTANCE_PER_NODE_KEY,
-
PipeSourceConstant.EXTERNAL_SOURCE_SINGLE_INSTANCE_PER_NODE_KEY),
-
PipeSourceConstant.EXTERNAL_EXTRACTOR_SINGLE_INSTANCE_PER_NODE_DEFAULT_VALUE)) {
- throw new PipeParameterNotValidException("single mode should be true in
MQTT extractor");
- }
-
- validateMoquetteConfig(validator.getParameters());
- }
-
- public void validateMoquetteConfig(final PipeParameters parameters) {
- final String sqlDialect =
- parameters.getStringOrDefault(
- SystemConstant.SQL_DIALECT_KEY,
SystemConstant.SQL_DIALECT_TREE_VALUE);
- final String formatType =
- parameters.getStringOrDefault(
- PipeSourceConstant.MQTT_PAYLOAD_FORMATTER_KEY,
- SystemConstant.SQL_DIALECT_TREE_VALUE.equals(sqlDialect)
- ? PipeSourceConstant.MQTT_PAYLOAD_FORMATTER_TREE_DIALECT_VALUE
- :
PipeSourceConstant.MQTT_PAYLOAD_FORMATTER_TABLE_DIALECT_VALUE);
- try {
- payloadFormat = PayloadFormatManager.getPayloadFormat(formatType);
- } catch (IllegalArgumentException e) {
- throw new PipeParameterNotValidException("Invalid payload format type: "
+ formatType);
- }
-
- final String ip =
- parameters.getStringOrDefault(
- PipeSourceConstant.MQTT_BROKER_HOST_KEY,
- PipeSourceConstant.MQTT_BROKER_HOST_DEFAULT_VALUE);
- final int port =
- Integer.parseInt(
- parameters.getStringOrDefault(
- PipeSourceConstant.MQTT_BROKER_PORT_KEY,
- PipeSourceConstant.MQTT_BROKER_PORT_DEFAULT_VALUE));
- try (ServerSocket socket = new ServerSocket()) {
- socket.bind(new InetSocketAddress(ip, port));
- } catch (IOException e) {
- throw new PipeParameterNotValidException(
- "Cannot bind MQTT broker to "
- + ip
- + ":"
- + port
- + ". The port might already be in use, or the IP address is
invalid.");
- }
-
- final String dataPath =
- parameters.getStringOrDefault(
- PipeSourceConstant.MQTT_DATA_PATH_PROPERTY_NAME_KEY,
- PipeSourceConstant.MQTT_DATA_PATH_PROPERTY_NAME_DEFAULT_VALUE);
- final File file =
Paths.get(dataPath).resolve("moquette_store.h2").toAbsolutePath().toFile();
- if (file.exists()) {
- try (RandomAccessFile raf = new RandomAccessFile(file, "rw");
- FileChannel channel = raf.getChannel();
- FileLock lock = channel.tryLock()) {
- if (lock == null) {
- throw new PipeParameterNotValidException(
- " The data file is used by another MQTT Source or MQTT Service. "
- + "Please use another data path");
- }
- } catch (Exception e) {
- throw new PipeParameterNotValidException(
- " The data file is used by another MQTT Source or MQTT Service. "
- + "Please use another data path");
- }
- }
- }
-
- @Override
- public void customize(
- final PipeParameters parameters, final PipeExtractorRuntimeConfiguration
configuration)
- throws Exception {
- final PipeTaskSourceRuntimeEnvironment environment =
- (PipeTaskSourceRuntimeEnvironment)
configuration.getRuntimeEnvironment();
- pipeName = environment.getPipeName();
- creationTime = environment.getCreationTime();
- pipeTaskMeta = environment.getPipeTaskMeta();
- brokerConfig = createBrokerConfig(parameters);
- handlers = new ArrayList<>(1);
- handlers.add(new MQTTPublishHandler(payloadFormat, environment,
pendingQueue));
- authenticator = new BrokerAuthenticator();
- }
-
- private IConfig createBrokerConfig(final PipeParameters pipeParameters) {
- final Properties properties = new Properties();
- properties.setProperty(
- BrokerConstants.HOST_PROPERTY_NAME,
- pipeParameters.getStringOrDefault(
- PipeSourceConstant.MQTT_BROKER_HOST_KEY,
- PipeSourceConstant.MQTT_BROKER_HOST_DEFAULT_VALUE));
- properties.setProperty(
- BrokerConstants.PORT_PROPERTY_NAME,
- pipeParameters.getStringOrDefault(
- PipeSourceConstant.MQTT_BROKER_PORT_KEY,
- PipeSourceConstant.MQTT_BROKER_PORT_DEFAULT_VALUE));
- properties.setProperty(
- BrokerConstants.BROKER_INTERCEPTOR_THREAD_POOL_SIZE,
- pipeParameters.getStringOrDefault(
- PipeSourceConstant.MQTT_BROKER_INTERCEPTOR_THREAD_POOL_SIZE_KEY,
- String.valueOf(
-
PipeSourceConstant.MQTT_BROKER_INTERCEPTOR_THREAD_POOL_SIZE_DEFAULT_VALUE)));
- properties.setProperty(
- BrokerConstants.DATA_PATH_PROPERTY_NAME,
- pipeParameters.getStringOrDefault(
- PipeSourceConstant.MQTT_DATA_PATH_PROPERTY_NAME_KEY,
- PipeSourceConstant.MQTT_DATA_PATH_PROPERTY_NAME_DEFAULT_VALUE));
- properties.setProperty(
- BrokerConstants.IMMEDIATE_BUFFER_FLUSH_PROPERTY_NAME,
- pipeParameters.getStringOrDefault(
- PipeSourceConstant.MQTT_IMMEDIATE_BUFFER_FLUSH_PROPERTY_NAME_KEY,
- String.valueOf(
-
PipeSourceConstant.MQTT_IMMEDIATE_BUFFER_FLUSH_PROPERTY_NAME_DEFAULT_VALUE)));
- properties.setProperty(
- BrokerConstants.ALLOW_ANONYMOUS_PROPERTY_NAME,
- pipeParameters.getStringOrDefault(
- PipeSourceConstant.MQTT_ALLOW_ANONYMOUS_PROPERTY_NAME_KEY,
-
String.valueOf(PipeSourceConstant.MQTT_ALLOW_ANONYMOUS_PROPERTY_NAME_DEFAULT_VALUE)));
- properties.setProperty(
- BrokerConstants.ALLOW_ZERO_BYTE_CLIENT_ID_PROPERTY_NAME,
- pipeParameters.getStringOrDefault(
-
PipeSourceConstant.MQTT_ALLOW_ZERO_BYTE_CLIENT_ID_PROPERTY_NAME_KEY,
- String.valueOf(
-
PipeSourceConstant.MQTT_ALLOW_ZERO_BYTE_CLIENT_ID_PROPERTY_NAME_DEFAULT_VALUE)));
- properties.setProperty(
- BrokerConstants.NETTY_MAX_BYTES_PROPERTY_NAME,
- pipeParameters.getStringOrDefault(
- PipeSourceConstant.MQTT_NETTY_MAX_BYTES_PROPERTY_NAME_KEY,
-
String.valueOf(PipeSourceConstant.MQTT_NETTY_MAX_BYTES_PROPERTY_NAME_DEFAULT_VALUE)));
- return new MemoryConfig(properties);
- }
-
- @Override
- public void start() throws Exception {
- if (hasBeenStarted.get()) {
- return;
- }
- hasBeenStarted.set(true);
-
- try {
- server.startServer(brokerConfig, handlers, null, authenticator, null);
- } catch (Exception e) {
- throw new PipeException(
- "Failed to start MQTT Extractor "
- + pipeName
- + ". Possible reasons: the data file might be used by another
MQTT Source or MQTT Service, "
- + "the port might already be in use, or there could be other
issues. "
- + "Please check the logs for more details.",
- e);
- }
-
- LOGGER.info(
- "Start MQTT Extractor successfully, listening on ip {}, port {}",
- brokerConfig.getProperty(BrokerConstants.HOST_PROPERTY_NAME),
- brokerConfig.getProperty(BrokerConstants.PORT_PROPERTY_NAME));
- }
-
- @Override
- public Event supply() throws Exception {
- return isClosed.get() ? null : pendingQueue.directPoll();
- }
-
- @Override
- public void close() throws Exception {
- if (!isClosed.get() && hasBeenStarted.get()) {
- server.stopServer();
- isClosed.set(true);
- }
- }
-}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java
index 8da1e50a4d5..74bc0d9815a 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java
@@ -44,7 +44,6 @@ import
org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.websocket.WebSock
import
org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.writeback.WriteBackSink;
import
org.apache.iotdb.commons.pipe.agent.plugin.builtin.source.donothing.DoNothingSource;
import
org.apache.iotdb.commons.pipe.agent.plugin.builtin.source.iotdb.IoTDBSource;
-import
org.apache.iotdb.commons.pipe.agent.plugin.builtin.source.mqtt.MQTTSource;
import java.util.Arrays;
import java.util.Collections;
@@ -56,11 +55,9 @@ public enum BuiltinPipePlugin {
// extractors
DO_NOTHING_EXTRACTOR("do-nothing-extractor", DoNothingSource.class),
IOTDB_EXTRACTOR("iotdb-extractor", IoTDBSource.class),
- MQTT_EXTRACTOR("mqtt-extractor", MQTTSource.class),
DO_NOTHING_SOURCE("do-nothing-source", DoNothingSource.class),
IOTDB_SOURCE("iotdb-source", IoTDBSource.class),
- MQTT_SOURCE("mqtt-source", MQTTSource.class),
// processors
DO_NOTHING_PROCESSOR("do-nothing-processor", DoNothingProcessor.class),
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/source/mqtt/MQTTSource.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/source/mqtt/MQTTSource.java
deleted file mode 100644
index 8e942824cdd..00000000000
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/source/mqtt/MQTTSource.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.commons.pipe.agent.plugin.builtin.source.mqtt;
-
-import org.apache.iotdb.pipe.api.PipeExtractor;
-import org.apache.iotdb.pipe.api.annotation.TableModel;
-import org.apache.iotdb.pipe.api.annotation.TreeModel;
-import
org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
-import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
-import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
-import org.apache.iotdb.pipe.api.event.Event;
-
-/**
- * This class is a placeholder and should not be initialized. It represents an
external Extractor
- * that uses the MQTT protocol to receive data. There is a real implementation
in the server module
- * but cannot be imported here. The pipe agent in the server module will
replace this class with the
- * real implementation when initializing the MQTTExtractor.
- */
-@TreeModel
-@TableModel
-public class MQTTSource implements PipeExtractor {
-
- private static final String PLACEHOLDER_ERROR_MSG =
- "This class is a placeholder and should not be used.";
-
- @Override
- public void validate(PipeParameterValidator validator) throws Exception {
- throw new UnsupportedOperationException(PLACEHOLDER_ERROR_MSG);
- }
-
- @Override
- public void customize(PipeParameters parameters,
PipeExtractorRuntimeConfiguration configuration)
- throws Exception {
- throw new UnsupportedOperationException(PLACEHOLDER_ERROR_MSG);
- }
-
- @Override
- public void start() throws Exception {
- throw new UnsupportedOperationException(PLACEHOLDER_ERROR_MSG);
- }
-
- @Override
- public Event supply() throws Exception {
- throw new UnsupportedOperationException(PLACEHOLDER_ERROR_MSG);
- }
-
- @Override
- public void close() throws Exception {
- throw new UnsupportedOperationException(PLACEHOLDER_ERROR_MSG);
- }
-}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSourceConstant.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSourceConstant.java
index 3887599ac54..24cc826df96 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSourceConstant.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSourceConstant.java
@@ -170,34 +170,6 @@ public class PipeSourceConstant {
public static final String EXTERNAL_SOURCE_SINGLE_INSTANCE_PER_NODE_KEY =
"source.single-mode";
public static final boolean
EXTERNAL_EXTRACTOR_SINGLE_INSTANCE_PER_NODE_DEFAULT_VALUE = true;
- public static final String MQTT_BROKER_HOST_KEY = "mqtt.host";
- public static final String MQTT_BROKER_HOST_DEFAULT_VALUE = "127.0.0.1";
- public static final String MQTT_BROKER_PORT_KEY = "mqtt.port";
- public static final String MQTT_BROKER_PORT_DEFAULT_VALUE = "1883";
-
- public static final String MQTT_BROKER_INTERCEPTOR_THREAD_POOL_SIZE_KEY =
"mqtt.pool-size";
- public static final int
MQTT_BROKER_INTERCEPTOR_THREAD_POOL_SIZE_DEFAULT_VALUE = 1;
-
- public static final String MQTT_DATA_PATH_PROPERTY_NAME_KEY =
"mqtt.data-path";
- public static final String MQTT_DATA_PATH_PROPERTY_NAME_DEFAULT_VALUE =
"data/";
-
- public static final String MQTT_IMMEDIATE_BUFFER_FLUSH_PROPERTY_NAME_KEY =
"mqtt.immediate-flush";
- public static final boolean
MQTT_IMMEDIATE_BUFFER_FLUSH_PROPERTY_NAME_DEFAULT_VALUE = true;
-
- public static final String MQTT_ALLOW_ANONYMOUS_PROPERTY_NAME_KEY =
"mqtt.allow-anonymous";
- public static final boolean MQTT_ALLOW_ANONYMOUS_PROPERTY_NAME_DEFAULT_VALUE
= false;
-
- public static final String MQTT_ALLOW_ZERO_BYTE_CLIENT_ID_PROPERTY_NAME_KEY =
- "mqtt.allow-zero-byte-client-id";
- public static final boolean
MQTT_ALLOW_ZERO_BYTE_CLIENT_ID_PROPERTY_NAME_DEFAULT_VALUE = true;
-
- public static final String MQTT_NETTY_MAX_BYTES_PROPERTY_NAME_KEY =
"mqtt.max-message-size";
- public static final long MQTT_NETTY_MAX_BYTES_PROPERTY_NAME_DEFAULT_VALUE =
1048576;
-
- public static final String MQTT_PAYLOAD_FORMATTER_KEY =
"mqtt.payload-formatter";
- public static final String MQTT_PAYLOAD_FORMATTER_TREE_DIALECT_VALUE =
"json";
- public static final String MQTT_PAYLOAD_FORMATTER_TABLE_DIALECT_VALUE =
"line";
-
///////////////////// pipe consensus /////////////////////
public static final String EXTRACTOR_CONSENSUS_GROUP_ID_KEY =
"extractor.consensus.group-id";