This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 03e8e51b5a0 Pipe: Removed the MQTT source (#16405)
03e8e51b5a0 is described below

commit 03e8e51b5a094abe6a477eb2766c5bf27bdd6756
Author: Caideyipi <[email protected]>
AuthorDate: Mon Sep 15 09:44:37 2025 +0800

    Pipe: Removed the MQTT source (#16405)
    
    * removal
    
    * fix
    
    * fix
---
 .../PipeDataRegionSourceConstructor.java           |   4 -
 .../db/pipe/source/mqtt/MQTTPublishHandler.java    | 320 ---------------------
 .../iotdb/db/pipe/source/mqtt/MQTTSource.java      | 264 -----------------
 .../agent/plugin/builtin/BuiltinPipePlugin.java    |   3 -
 .../plugin/builtin/source/mqtt/MQTTSource.java     |  68 -----
 .../pipe/config/constant/PipeSourceConstant.java   |  28 --
 6 files changed, 687 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionSourceConstructor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionSourceConstructor.java
index d67ef97913c..42caccb3e5f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionSourceConstructor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionSourceConstructor.java
@@ -24,7 +24,6 @@ import 
org.apache.iotdb.commons.pipe.agent.plugin.builtin.source.donothing.DoNot
 import 
org.apache.iotdb.commons.pipe.agent.plugin.constructor.PipeSourceConstructor;
 import 
org.apache.iotdb.commons.pipe.agent.plugin.meta.DataNodePipePluginMetaKeeper;
 import org.apache.iotdb.db.pipe.source.dataregion.IoTDBDataRegionSource;
-import org.apache.iotdb.db.pipe.source.mqtt.MQTTSource;
 
 class PipeDataRegionSourceConstructor extends PipeSourceConstructor {
 
@@ -43,8 +42,5 @@ class PipeDataRegionSourceConstructor extends 
PipeSourceConstructor {
         BuiltinPipePlugin.DO_NOTHING_SOURCE.getPipePluginName(), 
DoNothingSource::new);
     pluginConstructors.put(
         BuiltinPipePlugin.IOTDB_SOURCE.getPipePluginName(), 
IoTDBDataRegionSource::new);
-
-    
pluginConstructors.put(BuiltinPipePlugin.MQTT_EXTRACTOR.getPipePluginName(), 
MQTTSource::new);
-    pluginConstructors.put(BuiltinPipePlugin.MQTT_SOURCE.getPipePluginName(), 
MQTTSource::new);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/mqtt/MQTTPublishHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/mqtt/MQTTPublishHandler.java
deleted file mode 100644
index f1eb102b32e..00000000000
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/mqtt/MQTTPublishHandler.java
+++ /dev/null
@@ -1,320 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.source.mqtt;
-
-import org.apache.iotdb.commons.conf.IoTDBConstant.ClientVersion;
-import org.apache.iotdb.commons.exception.IllegalPathException;
-import 
org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
-import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
-import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskSourceRuntimeEnvironment;
-import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
-import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
-import 
org.apache.iotdb.db.pipe.event.common.statement.PipeStatementInsertionEvent;
-import org.apache.iotdb.db.protocol.mqtt.Message;
-import org.apache.iotdb.db.protocol.mqtt.PayloadFormatter;
-import org.apache.iotdb.db.protocol.mqtt.TableMessage;
-import org.apache.iotdb.db.protocol.mqtt.TreeMessage;
-import org.apache.iotdb.db.protocol.session.IClientSession;
-import org.apache.iotdb.db.protocol.session.MqttClientSession;
-import org.apache.iotdb.db.protocol.session.SessionManager;
-import org.apache.iotdb.db.queryengine.plan.Coordinator;
-import 
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeDevicePathCache;
-import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;
-import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
-import org.apache.iotdb.db.utils.CommonUtils;
-import org.apache.iotdb.db.utils.TimestampPrecisionUtils;
-import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
-
-import io.moquette.interception.AbstractInterceptHandler;
-import io.moquette.interception.messages.InterceptConnectMessage;
-import io.moquette.interception.messages.InterceptDisconnectMessage;
-import io.moquette.interception.messages.InterceptPublishMessage;
-import io.netty.buffer.ByteBuf;
-import io.netty.handler.codec.mqtt.MqttQoS;
-import org.apache.tsfile.enums.TSDataType;
-import org.apache.tsfile.utils.BitMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.time.ZoneId;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-/** PublishHandler handle the messages from MQTT clients. */
-public class MQTTPublishHandler extends AbstractInterceptHandler {
-
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(MQTTPublishHandler.class);
-
-  private final SessionManager sessionManager = SessionManager.getInstance();
-
-  private final ConcurrentHashMap<String, MqttClientSession> 
clientIdToSessionMap =
-      new ConcurrentHashMap<>();
-  private final PayloadFormatter payloadFormat;
-  private final boolean useTableInsert;
-  private final UnboundedBlockingPendingQueue<EnrichedEvent> pendingQueue;
-  private final String pipeName;
-  private final long creationTime;
-  private final PipeTaskMeta pipeTaskMeta;
-
-  public MQTTPublishHandler(
-      final PayloadFormatter payloadFormat,
-      final PipeTaskSourceRuntimeEnvironment environment,
-      final UnboundedBlockingPendingQueue<EnrichedEvent> pendingQueue) {
-    this.payloadFormat = payloadFormat;
-    useTableInsert = 
PayloadFormatter.TABLE_TYPE.equals(this.payloadFormat.getType());
-    pipeName = environment.getPipeName();
-    creationTime = environment.getCreationTime();
-    pipeTaskMeta = environment.getPipeTaskMeta();
-    this.pendingQueue = pendingQueue;
-  }
-
-  @Override
-  public String getID() {
-    return "mqtt-source-broker-listener";
-  }
-
-  @Override
-  public void onConnect(InterceptConnectMessage msg) {
-    if (msg.getClientID() == null || msg.getClientID().trim().isEmpty()) {
-      LOGGER.error(
-          "Connection refused: client_id is missing or empty. A valid 
client_id is required to establish a connection.");
-    }
-    if (!clientIdToSessionMap.containsKey(msg.getClientID())) {
-      final MqttClientSession session = new 
MqttClientSession(msg.getClientID());
-      sessionManager.login(
-          session,
-          msg.getUsername(),
-          new String(msg.getPassword()),
-          ZoneId.systemDefault().toString(),
-          TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3,
-          ClientVersion.V_1_0,
-          useTableInsert ? IClientSession.SqlDialect.TABLE : 
IClientSession.SqlDialect.TREE);
-      sessionManager.registerSessionForMqtt(session);
-      clientIdToSessionMap.put(msg.getClientID(), session);
-    }
-  }
-
-  @Override
-  public void onDisconnect(InterceptDisconnectMessage msg) {
-    final MqttClientSession session = 
clientIdToSessionMap.remove(msg.getClientID());
-    if (null != session) {
-      sessionManager.removeCurrSessionForMqtt(session);
-      sessionManager.closeSession(session, 
Coordinator.getInstance()::cleanupQueryExecution);
-    }
-  }
-
-  @Override
-  public void onPublish(InterceptPublishMessage msg) {
-    try {
-      final String clientId = msg.getClientID();
-      if (!clientIdToSessionMap.containsKey(clientId)) {
-        return;
-      }
-      final MqttClientSession session = 
clientIdToSessionMap.get(msg.getClientID());
-      final ByteBuf payload = msg.getPayload();
-      final String topic = msg.getTopicName();
-      final String username = msg.getUsername();
-      final MqttQoS qos = msg.getQos();
-
-      if (LOGGER.isDebugEnabled()) {
-        LOGGER.debug(
-            "Receive publish message. clientId: {}, username: {}, qos: {}, 
topic: {}, payload: {}",
-            clientId,
-            username,
-            qos,
-            topic,
-            payload);
-      }
-
-      final List<Message> messages = payloadFormat.format(topic, payload);
-      if (messages == null) {
-        return;
-      }
-
-      for (Message message : messages) {
-        if (message == null) {
-          continue;
-        }
-        if (useTableInsert) {
-          extractTable((TableMessage) message, session);
-        } else {
-          extractTree((TreeMessage) message, session);
-        }
-      }
-    } catch (Throwable t) {
-      LOGGER.warn("onPublish execution exception, msg is {}, error is ", msg, 
t);
-    } finally {
-      // release the payload of the message
-      super.onPublish(msg);
-    }
-  }
-
-  private void extractTable(final TableMessage message, final 
MqttClientSession session) {
-    try {
-      TimestampPrecisionUtils.checkTimestampPrecision(message.getTimestamp());
-      InsertTabletStatement insertTabletStatement = 
constructInsertTabletStatement(message);
-      session.setSqlDialect(IClientSession.SqlDialect.TABLE);
-      final EnrichedEvent event =
-          new PipeStatementInsertionEvent(
-              pipeName,
-              creationTime,
-              pipeTaskMeta,
-              null,
-              null,
-              session.getUsername(),
-              true,
-              true,
-              message.getDatabase().toLowerCase(),
-              insertTabletStatement);
-      if (!event.increaseReferenceCount(MQTTPublishHandler.class.getName())) {
-        LOGGER.warn("The reference count of the event {} cannot be increased, 
skipping it.", event);
-        return;
-      }
-      pendingQueue.waitedOffer(event);
-    } catch (Exception e) {
-      LOGGER.warn(
-          "meet error when polling mqtt source message database {}, table {}, 
tags {}, attributes {}, fields {}, at time {}, because {}",
-          message.getDatabase(),
-          message.getTable(),
-          message.getTagKeys(),
-          message.getAttributeKeys(),
-          message.getFields(),
-          message.getTimestamp(),
-          e.getMessage(),
-          e);
-    }
-  }
-
-  private InsertTabletStatement constructInsertTabletStatement(TableMessage 
message)
-      throws IllegalPathException {
-    InsertTabletStatement statement = new InsertTabletStatement();
-    statement.setDevicePath(
-        
DataNodeDevicePathCache.getInstance().getPartialPath(message.getTable()));
-    List<String> measurements =
-        Stream.of(message.getFields(), message.getTagKeys(), 
message.getAttributeKeys())
-            .flatMap(List::stream)
-            .collect(Collectors.toList());
-    statement.setMeasurements(measurements.toArray(new String[0]));
-    long[] timestamps = new long[] {message.getTimestamp()};
-    statement.setTimes(timestamps);
-    int columnSize = measurements.size();
-    int rowSize = 1;
-
-    BitMap[] bitMaps = new BitMap[columnSize];
-    Object[] columns =
-        Stream.of(message.getValues(), message.getTagValues(), 
message.getAttributeValues())
-            .flatMap(List::stream)
-            .toArray(Object[]::new);
-    statement.setColumns(columns);
-    statement.setBitMaps(bitMaps);
-    statement.setRowCount(rowSize);
-    statement.setAligned(false);
-    statement.setWriteToTable(true);
-    TSDataType[] dataTypes = new TSDataType[measurements.size()];
-    TsTableColumnCategory[] columnCategories = new 
TsTableColumnCategory[measurements.size()];
-    for (int i = 0; i < message.getFields().size(); i++) {
-      dataTypes[i] = message.getDataTypes().get(i);
-      columnCategories[i] = TsTableColumnCategory.FIELD;
-    }
-    for (int i = message.getFields().size();
-        i < message.getFields().size() + message.getTagKeys().size();
-        i++) {
-      dataTypes[i] = TSDataType.STRING;
-      columnCategories[i] = TsTableColumnCategory.TAG;
-    }
-    for (int i = message.getFields().size() + message.getTagKeys().size();
-        i
-            < message.getFields().size()
-                + message.getTagKeys().size()
-                + message.getAttributeKeys().size();
-        i++) {
-      dataTypes[i] = TSDataType.STRING;
-      columnCategories[i] = TsTableColumnCategory.ATTRIBUTE;
-    }
-    statement.setDataTypes(dataTypes);
-    statement.setColumnCategories(columnCategories);
-
-    return statement;
-  }
-
-  private void extractTree(final TreeMessage message, final MqttClientSession 
session) {
-    try {
-      InsertRowStatement statement = new InsertRowStatement();
-      statement.setDevicePath(
-          
DataNodeDevicePathCache.getInstance().getPartialPath(message.getDevice()));
-      TimestampPrecisionUtils.checkTimestampPrecision(message.getTimestamp());
-      statement.setTime(message.getTimestamp());
-      statement.setMeasurements(message.getMeasurements().toArray(new 
String[0]));
-      if (message.getDataTypes() == null) {
-        statement.setDataTypes(new 
TSDataType[message.getMeasurements().size()]);
-        statement.setValues(message.getValues().toArray(new Object[0]));
-        statement.setNeedInferType(true);
-      } else {
-        List<TSDataType> dataTypes = message.getDataTypes();
-        List<String> values = message.getValues();
-        Object[] inferredValues = new Object[values.size()];
-        for (int i = 0; i < values.size(); ++i) {
-          inferredValues[i] =
-              values.get(i) == null
-                  ? null
-                  : CommonUtils.parseValue(dataTypes.get(i), values.get(i));
-        }
-        statement.setDataTypes(dataTypes.toArray(new TSDataType[0]));
-        statement.setValues(inferredValues);
-      }
-      statement.setAligned(false);
-      final EnrichedEvent event =
-          new PipeStatementInsertionEvent(
-              pipeName,
-              creationTime,
-              pipeTaskMeta,
-              null,
-              null,
-              session.getUsername(),
-              true,
-              false,
-              message.getDevice(),
-              statement);
-      if (!event.increaseReferenceCount(MQTTPublishHandler.class.getName())) {
-        LOGGER.warn("The reference count of the event {} cannot be increased, 
skipping it.", event);
-        return;
-      }
-      pendingQueue.waitedOffer(event);
-    } catch (Exception e) {
-      LOGGER.warn(
-          "meet error when polling mqtt source device {}, measurements {}, at 
time {}, because {}",
-          message.getDevice(),
-          message.getMeasurements(),
-          message.getTimestamp(),
-          e.getMessage(),
-          e);
-    }
-  }
-
-  @Override
-  public void onSessionLoopError(Throwable throwable) {
-    LOGGER.warn(
-        "onSessionLoopError: {}",
-        throwable.getMessage() == null ? "null" : throwable.getMessage(),
-        throwable);
-  }
-}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/mqtt/MQTTSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/mqtt/MQTTSource.java
deleted file mode 100644
index f4d289b0ef6..00000000000
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/mqtt/MQTTSource.java
+++ /dev/null
@@ -1,264 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.source.mqtt;
-
-import 
org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
-import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
-import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
-import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
-import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskSourceRuntimeEnvironment;
-import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
-import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionEventCounter;
-import org.apache.iotdb.db.protocol.mqtt.BrokerAuthenticator;
-import org.apache.iotdb.db.protocol.mqtt.PayloadFormatManager;
-import org.apache.iotdb.db.protocol.mqtt.PayloadFormatter;
-import org.apache.iotdb.pipe.api.PipeExtractor;
-import org.apache.iotdb.pipe.api.annotation.TableModel;
-import org.apache.iotdb.pipe.api.annotation.TreeModel;
-import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
-import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
-import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
-import org.apache.iotdb.pipe.api.event.Event;
-import org.apache.iotdb.pipe.api.exception.PipeException;
-import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
-
-import io.moquette.BrokerConstants;
-import io.moquette.broker.Server;
-import io.moquette.broker.config.IConfig;
-import io.moquette.broker.config.MemoryConfig;
-import io.moquette.broker.security.IAuthenticator;
-import io.moquette.interception.InterceptHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.nio.channels.FileChannel;
-import java.nio.channels.FileLock;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * MQTTExtractor is an external Extractor that uses the MQTT protocol to 
receive data. It starts an
- * MQTT broker and listens for incoming messages, which are then processed and 
passed to the pending
- * queue.
- */
-@TreeModel
-@TableModel
-public class MQTTSource implements PipeExtractor {
-
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(MQTTSource.class);
-
-  protected String pipeName;
-  protected long creationTime;
-  protected PipeTaskMeta pipeTaskMeta;
-  protected final UnboundedBlockingPendingQueue<EnrichedEvent> pendingQueue =
-      new UnboundedBlockingPendingQueue<>(new PipeDataRegionEventCounter());
-
-  protected PayloadFormatter payloadFormat;
-  protected IConfig brokerConfig;
-  protected List<InterceptHandler> handlers;
-  protected IAuthenticator authenticator;
-  private final Server server = new Server();
-
-  protected final AtomicBoolean isClosed = new AtomicBoolean(false);
-  private final AtomicBoolean hasBeenStarted = new AtomicBoolean(false);
-
-  @Override
-  public void validate(final PipeParameterValidator validator) throws 
Exception {
-    if (!validator
-        .getParameters()
-        .getBooleanOrDefault(
-            Arrays.asList(
-                
PipeSourceConstant.EXTERNAL_EXTRACTOR_SINGLE_INSTANCE_PER_NODE_KEY,
-                
PipeSourceConstant.EXTERNAL_SOURCE_SINGLE_INSTANCE_PER_NODE_KEY),
-            
PipeSourceConstant.EXTERNAL_EXTRACTOR_SINGLE_INSTANCE_PER_NODE_DEFAULT_VALUE)) {
-      throw new PipeParameterNotValidException("single mode should be true in 
MQTT extractor");
-    }
-
-    validateMoquetteConfig(validator.getParameters());
-  }
-
-  public void validateMoquetteConfig(final PipeParameters parameters) {
-    final String sqlDialect =
-        parameters.getStringOrDefault(
-            SystemConstant.SQL_DIALECT_KEY, 
SystemConstant.SQL_DIALECT_TREE_VALUE);
-    final String formatType =
-        parameters.getStringOrDefault(
-            PipeSourceConstant.MQTT_PAYLOAD_FORMATTER_KEY,
-            SystemConstant.SQL_DIALECT_TREE_VALUE.equals(sqlDialect)
-                ? PipeSourceConstant.MQTT_PAYLOAD_FORMATTER_TREE_DIALECT_VALUE
-                : 
PipeSourceConstant.MQTT_PAYLOAD_FORMATTER_TABLE_DIALECT_VALUE);
-    try {
-      payloadFormat = PayloadFormatManager.getPayloadFormat(formatType);
-    } catch (IllegalArgumentException e) {
-      throw new PipeParameterNotValidException("Invalid payload format type: " 
+ formatType);
-    }
-
-    final String ip =
-        parameters.getStringOrDefault(
-            PipeSourceConstant.MQTT_BROKER_HOST_KEY,
-            PipeSourceConstant.MQTT_BROKER_HOST_DEFAULT_VALUE);
-    final int port =
-        Integer.parseInt(
-            parameters.getStringOrDefault(
-                PipeSourceConstant.MQTT_BROKER_PORT_KEY,
-                PipeSourceConstant.MQTT_BROKER_PORT_DEFAULT_VALUE));
-    try (ServerSocket socket = new ServerSocket()) {
-      socket.bind(new InetSocketAddress(ip, port));
-    } catch (IOException e) {
-      throw new PipeParameterNotValidException(
-          "Cannot bind MQTT broker to "
-              + ip
-              + ":"
-              + port
-              + ". The port might already be in use, or the IP address is 
invalid.");
-    }
-
-    final String dataPath =
-        parameters.getStringOrDefault(
-            PipeSourceConstant.MQTT_DATA_PATH_PROPERTY_NAME_KEY,
-            PipeSourceConstant.MQTT_DATA_PATH_PROPERTY_NAME_DEFAULT_VALUE);
-    final File file = 
Paths.get(dataPath).resolve("moquette_store.h2").toAbsolutePath().toFile();
-    if (file.exists()) {
-      try (RandomAccessFile raf = new RandomAccessFile(file, "rw");
-          FileChannel channel = raf.getChannel();
-          FileLock lock = channel.tryLock()) {
-        if (lock == null) {
-          throw new PipeParameterNotValidException(
-              " The data file is used by another MQTT Source or MQTT Service. "
-                  + "Please use another data path");
-        }
-      } catch (Exception e) {
-        throw new PipeParameterNotValidException(
-            " The data file is used by another MQTT Source or MQTT Service. "
-                + "Please use another data path");
-      }
-    }
-  }
-
-  @Override
-  public void customize(
-      final PipeParameters parameters, final PipeExtractorRuntimeConfiguration 
configuration)
-      throws Exception {
-    final PipeTaskSourceRuntimeEnvironment environment =
-        (PipeTaskSourceRuntimeEnvironment) 
configuration.getRuntimeEnvironment();
-    pipeName = environment.getPipeName();
-    creationTime = environment.getCreationTime();
-    pipeTaskMeta = environment.getPipeTaskMeta();
-    brokerConfig = createBrokerConfig(parameters);
-    handlers = new ArrayList<>(1);
-    handlers.add(new MQTTPublishHandler(payloadFormat, environment, 
pendingQueue));
-    authenticator = new BrokerAuthenticator();
-  }
-
-  private IConfig createBrokerConfig(final PipeParameters pipeParameters) {
-    final Properties properties = new Properties();
-    properties.setProperty(
-        BrokerConstants.HOST_PROPERTY_NAME,
-        pipeParameters.getStringOrDefault(
-            PipeSourceConstant.MQTT_BROKER_HOST_KEY,
-            PipeSourceConstant.MQTT_BROKER_HOST_DEFAULT_VALUE));
-    properties.setProperty(
-        BrokerConstants.PORT_PROPERTY_NAME,
-        pipeParameters.getStringOrDefault(
-            PipeSourceConstant.MQTT_BROKER_PORT_KEY,
-            PipeSourceConstant.MQTT_BROKER_PORT_DEFAULT_VALUE));
-    properties.setProperty(
-        BrokerConstants.BROKER_INTERCEPTOR_THREAD_POOL_SIZE,
-        pipeParameters.getStringOrDefault(
-            PipeSourceConstant.MQTT_BROKER_INTERCEPTOR_THREAD_POOL_SIZE_KEY,
-            String.valueOf(
-                
PipeSourceConstant.MQTT_BROKER_INTERCEPTOR_THREAD_POOL_SIZE_DEFAULT_VALUE)));
-    properties.setProperty(
-        BrokerConstants.DATA_PATH_PROPERTY_NAME,
-        pipeParameters.getStringOrDefault(
-            PipeSourceConstant.MQTT_DATA_PATH_PROPERTY_NAME_KEY,
-            PipeSourceConstant.MQTT_DATA_PATH_PROPERTY_NAME_DEFAULT_VALUE));
-    properties.setProperty(
-        BrokerConstants.IMMEDIATE_BUFFER_FLUSH_PROPERTY_NAME,
-        pipeParameters.getStringOrDefault(
-            PipeSourceConstant.MQTT_IMMEDIATE_BUFFER_FLUSH_PROPERTY_NAME_KEY,
-            String.valueOf(
-                
PipeSourceConstant.MQTT_IMMEDIATE_BUFFER_FLUSH_PROPERTY_NAME_DEFAULT_VALUE)));
-    properties.setProperty(
-        BrokerConstants.ALLOW_ANONYMOUS_PROPERTY_NAME,
-        pipeParameters.getStringOrDefault(
-            PipeSourceConstant.MQTT_ALLOW_ANONYMOUS_PROPERTY_NAME_KEY,
-            
String.valueOf(PipeSourceConstant.MQTT_ALLOW_ANONYMOUS_PROPERTY_NAME_DEFAULT_VALUE)));
-    properties.setProperty(
-        BrokerConstants.ALLOW_ZERO_BYTE_CLIENT_ID_PROPERTY_NAME,
-        pipeParameters.getStringOrDefault(
-            
PipeSourceConstant.MQTT_ALLOW_ZERO_BYTE_CLIENT_ID_PROPERTY_NAME_KEY,
-            String.valueOf(
-                
PipeSourceConstant.MQTT_ALLOW_ZERO_BYTE_CLIENT_ID_PROPERTY_NAME_DEFAULT_VALUE)));
-    properties.setProperty(
-        BrokerConstants.NETTY_MAX_BYTES_PROPERTY_NAME,
-        pipeParameters.getStringOrDefault(
-            PipeSourceConstant.MQTT_NETTY_MAX_BYTES_PROPERTY_NAME_KEY,
-            
String.valueOf(PipeSourceConstant.MQTT_NETTY_MAX_BYTES_PROPERTY_NAME_DEFAULT_VALUE)));
-    return new MemoryConfig(properties);
-  }
-
-  @Override
-  public void start() throws Exception {
-    if (hasBeenStarted.get()) {
-      return;
-    }
-    hasBeenStarted.set(true);
-
-    try {
-      server.startServer(brokerConfig, handlers, null, authenticator, null);
-    } catch (Exception e) {
-      throw new PipeException(
-          "Failed to start MQTT Extractor "
-              + pipeName
-              + ". Possible reasons: the data file might be used by another 
MQTT Source or MQTT Service, "
-              + "the port might already be in use, or there could be other 
issues. "
-              + "Please check the logs for more details.",
-          e);
-    }
-
-    LOGGER.info(
-        "Start MQTT Extractor successfully, listening on ip {}, port {}",
-        brokerConfig.getProperty(BrokerConstants.HOST_PROPERTY_NAME),
-        brokerConfig.getProperty(BrokerConstants.PORT_PROPERTY_NAME));
-  }
-
-  @Override
-  public Event supply() throws Exception {
-    return isClosed.get() ? null : pendingQueue.directPoll();
-  }
-
-  @Override
-  public void close() throws Exception {
-    if (!isClosed.get() && hasBeenStarted.get()) {
-      server.stopServer();
-      isClosed.set(true);
-    }
-  }
-}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java
index 8da1e50a4d5..74bc0d9815a 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java
@@ -44,7 +44,6 @@ import 
org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.websocket.WebSock
 import 
org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.writeback.WriteBackSink;
 import 
org.apache.iotdb.commons.pipe.agent.plugin.builtin.source.donothing.DoNothingSource;
 import 
org.apache.iotdb.commons.pipe.agent.plugin.builtin.source.iotdb.IoTDBSource;
-import 
org.apache.iotdb.commons.pipe.agent.plugin.builtin.source.mqtt.MQTTSource;
 
 import java.util.Arrays;
 import java.util.Collections;
@@ -56,11 +55,9 @@ public enum BuiltinPipePlugin {
   // extractors
   DO_NOTHING_EXTRACTOR("do-nothing-extractor", DoNothingSource.class),
   IOTDB_EXTRACTOR("iotdb-extractor", IoTDBSource.class),
-  MQTT_EXTRACTOR("mqtt-extractor", MQTTSource.class),
 
   DO_NOTHING_SOURCE("do-nothing-source", DoNothingSource.class),
   IOTDB_SOURCE("iotdb-source", IoTDBSource.class),
-  MQTT_SOURCE("mqtt-source", MQTTSource.class),
 
   // processors
   DO_NOTHING_PROCESSOR("do-nothing-processor", DoNothingProcessor.class),
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/source/mqtt/MQTTSource.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/source/mqtt/MQTTSource.java
deleted file mode 100644
index 8e942824cdd..00000000000
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/source/mqtt/MQTTSource.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.commons.pipe.agent.plugin.builtin.source.mqtt;
-
-import org.apache.iotdb.pipe.api.PipeExtractor;
-import org.apache.iotdb.pipe.api.annotation.TableModel;
-import org.apache.iotdb.pipe.api.annotation.TreeModel;
-import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
-import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
-import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
-import org.apache.iotdb.pipe.api.event.Event;
-
-/**
- * This class is a placeholder and should not be initialized. It represents an 
external Extractor
- * that uses the MQTT protocol to receive data. There is a real implementation 
in the server module
- * but cannot be imported here. The pipe agent in the server module will 
replace this class with the
- * real implementation when initializing the MQTTExtractor.
- */
-@TreeModel
-@TableModel
-public class MQTTSource implements PipeExtractor {
-
-  private static final String PLACEHOLDER_ERROR_MSG =
-      "This class is a placeholder and should not be used.";
-
-  @Override
-  public void validate(PipeParameterValidator validator) throws Exception {
-    throw new UnsupportedOperationException(PLACEHOLDER_ERROR_MSG);
-  }
-
-  @Override
-  public void customize(PipeParameters parameters, 
PipeExtractorRuntimeConfiguration configuration)
-      throws Exception {
-    throw new UnsupportedOperationException(PLACEHOLDER_ERROR_MSG);
-  }
-
-  @Override
-  public void start() throws Exception {
-    throw new UnsupportedOperationException(PLACEHOLDER_ERROR_MSG);
-  }
-
-  @Override
-  public Event supply() throws Exception {
-    throw new UnsupportedOperationException(PLACEHOLDER_ERROR_MSG);
-  }
-
-  @Override
-  public void close() throws Exception {
-    throw new UnsupportedOperationException(PLACEHOLDER_ERROR_MSG);
-  }
-}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSourceConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSourceConstant.java
index 3887599ac54..24cc826df96 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSourceConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSourceConstant.java
@@ -170,34 +170,6 @@ public class PipeSourceConstant {
   public static final String EXTERNAL_SOURCE_SINGLE_INSTANCE_PER_NODE_KEY = 
"source.single-mode";
   public static final boolean 
EXTERNAL_EXTRACTOR_SINGLE_INSTANCE_PER_NODE_DEFAULT_VALUE = true;
 
-  public static final String MQTT_BROKER_HOST_KEY = "mqtt.host";
-  public static final String MQTT_BROKER_HOST_DEFAULT_VALUE = "127.0.0.1";
-  public static final String MQTT_BROKER_PORT_KEY = "mqtt.port";
-  public static final String MQTT_BROKER_PORT_DEFAULT_VALUE = "1883";
-
-  public static final String MQTT_BROKER_INTERCEPTOR_THREAD_POOL_SIZE_KEY = 
"mqtt.pool-size";
-  public static final int 
MQTT_BROKER_INTERCEPTOR_THREAD_POOL_SIZE_DEFAULT_VALUE = 1;
-
-  public static final String MQTT_DATA_PATH_PROPERTY_NAME_KEY = 
"mqtt.data-path";
-  public static final String MQTT_DATA_PATH_PROPERTY_NAME_DEFAULT_VALUE = 
"data/";
-
-  public static final String MQTT_IMMEDIATE_BUFFER_FLUSH_PROPERTY_NAME_KEY = 
"mqtt.immediate-flush";
-  public static final boolean 
MQTT_IMMEDIATE_BUFFER_FLUSH_PROPERTY_NAME_DEFAULT_VALUE = true;
-
-  public static final String MQTT_ALLOW_ANONYMOUS_PROPERTY_NAME_KEY = 
"mqtt.allow-anonymous";
-  public static final boolean MQTT_ALLOW_ANONYMOUS_PROPERTY_NAME_DEFAULT_VALUE 
= false;
-
-  public static final String MQTT_ALLOW_ZERO_BYTE_CLIENT_ID_PROPERTY_NAME_KEY =
-      "mqtt.allow-zero-byte-client-id";
-  public static final boolean 
MQTT_ALLOW_ZERO_BYTE_CLIENT_ID_PROPERTY_NAME_DEFAULT_VALUE = true;
-
-  public static final String MQTT_NETTY_MAX_BYTES_PROPERTY_NAME_KEY = 
"mqtt.max-message-size";
-  public static final long MQTT_NETTY_MAX_BYTES_PROPERTY_NAME_DEFAULT_VALUE = 
1048576;
-
-  public static final String MQTT_PAYLOAD_FORMATTER_KEY = 
"mqtt.payload-formatter";
-  public static final String MQTT_PAYLOAD_FORMATTER_TREE_DIALECT_VALUE = 
"json";
-  public static final String MQTT_PAYLOAD_FORMATTER_TABLE_DIALECT_VALUE = 
"line";
-
   ///////////////////// pipe consensus /////////////////////
 
   public static final String EXTRACTOR_CONSENSUS_GROUP_ID_KEY = 
"extractor.consensus.group-id";


Reply via email to