This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch rc/1.2.2
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 3aedfafe89ac8849fc9a581edd9addc9597cff6e
Author: Steve Yurong Su <[email protected]>
AuthorDate: Wed Sep 27 09:54:43 2023 +0800

    Revert "[To rel/1.2][IOTDB-6143] Pipe: Support PipeConnector subtasks with 
the same parameters concurrently scheduling (#11185)"
    
    This reverts commit ab2e107b246686949c2313fbc914f256e2392042.
---
 .../config/constant/PipeConnectorConstant.java     |   6 -
 .../connector/protocol/opcua/OpcUaConnector.java   |  72 +++--------
 .../protocol/websocket/WebSocketConnector.java     |  56 ++-------
 .../connector/PipeConnectorSubtaskManager.java     | 132 +++++++--------------
 4 files changed, 72 insertions(+), 194 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
index 3414b7a9b29..8135ae577ad 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
@@ -19,8 +19,6 @@
 
 package org.apache.iotdb.db.pipe.config.constant;
 
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
-
 import static org.apache.iotdb.commons.conf.IoTDBConstant.MB;
 
 public class PipeConnectorConstant {
@@ -31,10 +29,6 @@ public class PipeConnectorConstant {
   public static final String CONNECTOR_IOTDB_PORT_KEY = "connector.port";
   public static final String CONNECTOR_IOTDB_NODE_URLS_KEY = 
"connector.node-urls";
 
-  public static final String CONNECTOR_IOTDB_PARALLEL_TASKS_KEY = 
"connector.parallel.tasks";
-  public static final int CONNECTOR_IOTDB_PARALLEL_TASKS_DEFAULT_VALUE =
-      PipeConfig.getInstance().getPipeSubtaskExecutorMaxThreadNum();
-
   public static final String CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY = 
"connector.batch.enable";
   public static final boolean CONNECTOR_IOTDB_BATCH_MODE_ENABLE_DEFAULT_VALUE 
= true;
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
index bc86efb6ec1..703e4652d99 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
@@ -28,11 +28,9 @@ import 
org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
-import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Binary;
-import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.record.Tablet;
 
 import org.eclipse.milo.opcua.sdk.server.OpcUaServer;
@@ -45,10 +43,7 @@ import 
org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Map;
 import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PASSWORD_KEY;
@@ -68,10 +63,6 @@ public class OpcUaConnector implements PipeConnector {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(OpcUaConnector.class);
 
-  private static final Map<String, Pair<AtomicInteger, OpcUaServer>>
-      SERVER_KEY_TO_REFERENCE_COUNT_AND_SERVER_MAP = new ConcurrentHashMap<>();
-
-  private String serverKey;
   private OpcUaServer server;
 
   @Override
@@ -95,31 +86,14 @@ public class OpcUaConnector implements PipeConnector {
         parameters.getStringOrDefault(
             CONNECTOR_IOTDB_PASSWORD_KEY, 
CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE);
 
-    synchronized (SERVER_KEY_TO_REFERENCE_COUNT_AND_SERVER_MAP) {
-      serverKey = httpsBindPort + ":" + tcpBindPort;
-
-      server =
-          SERVER_KEY_TO_REFERENCE_COUNT_AND_SERVER_MAP
-              .computeIfAbsent(
-                  serverKey,
-                  key -> {
-                    try {
-                      final OpcUaServer newServer =
-                          new OpcUaServerBuilder()
-                              .setTcpBindPort(tcpBindPort)
-                              .setHttpsBindPort(httpsBindPort)
-                              .setUser(user)
-                              .setPassword(password)
-                              .build();
-                      newServer.startup();
-                      return new Pair<>(new AtomicInteger(0), newServer);
-                    } catch (Exception e) {
-                      throw new PipeException("Failed to build and startup 
OpcUaServer", e);
-                    }
-                  })
-              .getRight();
-      
SERVER_KEY_TO_REFERENCE_COUNT_AND_SERVER_MAP.get(serverKey).getLeft().incrementAndGet();
-    }
+    server =
+        new OpcUaServerBuilder()
+            .setTcpBindPort(tcpBindPort)
+            .setHttpsBindPort(httpsBindPort)
+            .setUser(user)
+            .setPassword(password)
+            .build();
+    server.startup();
   }
 
   @Override
@@ -132,11 +106,6 @@ public class OpcUaConnector implements PipeConnector {
     // Server side, do nothing
   }
 
-  @Override
-  public void transfer(Event event) throws Exception {
-    // Do nothing when receive heartbeat or other events
-  }
-
   @Override
   public void transfer(TabletInsertionEvent tabletInsertionEvent) throws 
Exception {
     // PipeProcessor can change the type of TabletInsertionEvent
@@ -266,25 +235,12 @@ public class OpcUaConnector implements PipeConnector {
   }
 
   @Override
-  public void close() throws Exception {
-    if (serverKey == null) {
-      return;
-    }
-
-    synchronized (SERVER_KEY_TO_REFERENCE_COUNT_AND_SERVER_MAP) {
-      final Pair<AtomicInteger, OpcUaServer> pair =
-          SERVER_KEY_TO_REFERENCE_COUNT_AND_SERVER_MAP.get(serverKey);
-      if (pair == null) {
-        return;
-      }
+  public void transfer(Event event) throws Exception {
+    // Do nothing when receive heartbeat or other events
+  }
 
-      if (pair.getLeft().decrementAndGet() <= 0) {
-        try {
-          pair.getRight().shutdown();
-        } finally {
-          SERVER_KEY_TO_REFERENCE_COUNT_AND_SERVER_MAP.remove(serverKey);
-        }
-      }
-    }
+  @Override
+  public void close() throws Exception {
+    server.shutdown();
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
index 9ee87b1df54..6df1bee24e2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
@@ -39,21 +39,15 @@ import javax.annotation.Nullable;
 
 import java.net.InetSocketAddress;
 import java.util.Comparator;
-import java.util.Map;
 import java.util.Optional;
 import java.util.PriorityQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 
 public class WebSocketConnector implements PipeConnector {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(WebSocketConnector.class);
-
-  private static final Map<Integer, Pair<AtomicInteger, 
WebSocketConnectorServer>>
-      PORT_TO_REFERENCE_COUNT_AND_SERVER_MAP = new ConcurrentHashMap<>();
-
-  private Integer port;
-  private WebSocketConnectorServer server;
+  private final AtomicReference<WebSocketConnectorServer> server = new 
AtomicReference<>();
+  private int port;
 
   public final AtomicLong commitIdGenerator = new AtomicLong(0);
   private final AtomicLong lastCommitId = new AtomicLong(0);
@@ -74,19 +68,13 @@ public class WebSocketConnector implements PipeConnector {
 
   @Override
   public void handshake() throws Exception {
-    synchronized (PORT_TO_REFERENCE_COUNT_AND_SERVER_MAP) {
-      server =
-          PORT_TO_REFERENCE_COUNT_AND_SERVER_MAP
-              .computeIfAbsent(
-                  port,
-                  key -> {
-                    final WebSocketConnectorServer newServer =
-                        new WebSocketConnectorServer(new 
InetSocketAddress(port), this);
-                    newServer.start();
-                    return new Pair<>(new AtomicInteger(0), newServer);
-                  })
-              .getRight();
-      
PORT_TO_REFERENCE_COUNT_AND_SERVER_MAP.get(port).getLeft().incrementAndGet();
+    if (server.get() == null) {
+      synchronized (server) {
+        if (server.get() == null) {
+          server.set(new WebSocketConnectorServer(new InetSocketAddress(port), 
this));
+          server.get().start();
+        }
+      }
     }
   }
 
@@ -106,7 +94,7 @@ public class WebSocketConnector implements PipeConnector {
     long commitId = commitIdGenerator.incrementAndGet();
     ((EnrichedEvent) tabletInsertionEvent)
         .increaseReferenceCount(WebSocketConnector.class.getName());
-    server.addEvent(new Pair<>(commitId, tabletInsertionEvent));
+    server.get().addEvent(new Pair<>(commitId, tabletInsertionEvent));
   }
 
   @Override
@@ -121,7 +109,7 @@ public class WebSocketConnector implements PipeConnector {
       for (TabletInsertionEvent event : 
tsFileInsertionEvent.toTabletInsertionEvents()) {
         long commitId = commitIdGenerator.incrementAndGet();
         ((EnrichedEvent) 
event).increaseReferenceCount(WebSocketConnector.class.getName());
-        server.addEvent(new Pair<>(commitId, event));
+        server.get().addEvent(new Pair<>(commitId, event));
       }
     } finally {
       tsFileInsertionEvent.close();
@@ -133,25 +121,7 @@ public class WebSocketConnector implements PipeConnector {
 
   @Override
   public void close() throws Exception {
-    if (port == null) {
-      return;
-    }
-
-    synchronized (PORT_TO_REFERENCE_COUNT_AND_SERVER_MAP) {
-      final Pair<AtomicInteger, WebSocketConnectorServer> pair =
-          PORT_TO_REFERENCE_COUNT_AND_SERVER_MAP.get(port);
-      if (pair == null) {
-        return;
-      }
-
-      if (pair.getLeft().decrementAndGet() <= 0) {
-        try {
-          pair.getRight().stop();
-        } finally {
-          PORT_TO_REFERENCE_COUNT_AND_SERVER_MAP.remove(port);
-        }
-      }
-    }
+    server.get().stop();
   }
 
   public synchronized void commit(long requestCommitId, @Nullable 
EnrichedEvent enrichedEvent) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
index a6c0c59dac5..9fde95da3fc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
@@ -38,22 +38,16 @@ import 
org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
-import java.util.function.Supplier;
 
 public class PipeConnectorSubtaskManager {
 
-  private static final Map<String, Supplier<PipeConnector>> 
CONNECTOR_CONSTRUCTORS =
-      new HashMap<>();
-
   private static final String FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE =
       "Failed to deregister PipeConnectorSubtask. No such subtask: ";
 
-  private final Map<String, List<PipeConnectorSubtaskLifeCycle>>
+  private final Map<String, PipeConnectorSubtaskLifeCycle>
       attributeSortedString2SubtaskLifeCycleMap = new HashMap<>();
 
   public synchronized String register(
@@ -64,62 +58,55 @@ public class PipeConnectorSubtaskManager {
         new TreeMap<>(pipeConnectorParameters.getAttribute()).toString();
 
     if 
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) 
{
-      final int connectorNum =
-          pipeConnectorParameters.getIntOrDefault(
-              PipeConnectorConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_KEY,
-              
PipeConnectorConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_DEFAULT_VALUE);
-      final List<PipeConnectorSubtaskLifeCycle> 
pipeConnectorSubtaskLifeCycleList =
-          new ArrayList<>(connectorNum);
-
+      // 1. Construct, validate and customize PipeConnector, and then 
handshake (create connection)
+      // with the target
       final String connectorKey =
           pipeConnectorParameters.getStringOrDefault(
               PipeConnectorConstant.CONNECTOR_KEY,
               BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName());
-      // Shared pending queue for all subtasks
-      final BoundedBlockingPendingQueue<Event> pendingQueue =
-          new BoundedBlockingPendingQueue<>(
-              PipeConfig.getInstance().getPipeConnectorPendingQueueSize());
 
-      for (int i = 0; i < connectorNum; i++) {
-        final PipeConnector pipeConnector =
-            CONNECTOR_CONSTRUCTORS
-                .getOrDefault(
-                    connectorKey,
-                    () -> 
PipeAgent.plugin().reflectConnector(pipeConnectorParameters))
-                .get();
-
-        // 1. Construct, validate and customize PipeConnector, and then 
handshake (create
-        // connection) with the target
-        try {
-          pipeConnector.validate(new 
PipeParameterValidator(pipeConnectorParameters));
-          pipeConnector.customize(
-              pipeConnectorParameters, new 
PipeTaskRuntimeConfiguration(pipeRuntimeEnvironment));
-          pipeConnector.handshake();
-        } catch (Exception e) {
-          throw new PipeException(
-              "Failed to construct PipeConnector, because of " + 
e.getMessage(), e);
-        }
-
-        // 2. Construct PipeConnectorSubtaskLifeCycle to manage 
PipeConnectorSubtask's life cycle
-        final PipeConnectorSubtask pipeConnectorSubtask =
-            new PipeConnectorSubtask(
-                String.format(
-                    "%s_%s_%s", attributeSortedString, 
pipeRuntimeEnvironment.getCreationTime(), i),
-                pendingQueue,
-                pipeConnector);
-        final PipeConnectorSubtaskLifeCycle pipeConnectorSubtaskLifeCycle =
-            new PipeConnectorSubtaskLifeCycle(executor, pipeConnectorSubtask, 
pendingQueue);
-        pipeConnectorSubtaskLifeCycleList.add(pipeConnectorSubtaskLifeCycle);
+      PipeConnector pipeConnector;
+      if 
(connectorKey.equals(BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName())
+          || connectorKey.equals(
+              
BuiltinPipePlugin.IOTDB_THRIFT_SYNC_CONNECTOR.getPipePluginName())) {
+        pipeConnector = new IoTDBThriftSyncConnector();
+      } else if (connectorKey.equals(
+          BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName())) 
{
+        pipeConnector = new IoTDBThriftAsyncConnector();
+      } else if (connectorKey.equals(
+          BuiltinPipePlugin.IOTDB_LEGACY_PIPE_CONNECTOR.getPipePluginName())) {
+        pipeConnector = new IoTDBLegacyPipeConnector();
+      } else if 
(connectorKey.equals(BuiltinPipePlugin.OPC_UA_CONNECTOR.getPipePluginName())) {
+        pipeConnector = new OpcUaConnector();
+      } else if 
(connectorKey.equals(BuiltinPipePlugin.WEBSOCKET_CONNECTOR.getPipePluginName()))
 {
+        pipeConnector = new WebSocketConnector();
+      } else {
+        pipeConnector = 
PipeAgent.plugin().reflectConnector(pipeConnectorParameters);
+      }
+
+      try {
+        pipeConnector.validate(new 
PipeParameterValidator(pipeConnectorParameters));
+        pipeConnector.customize(
+            pipeConnectorParameters, new 
PipeTaskRuntimeConfiguration(pipeRuntimeEnvironment));
+        pipeConnector.handshake();
+      } catch (Exception e) {
+        throw new PipeException(
+            "Failed to construct PipeConnector, because of " + e.getMessage(), 
e);
       }
 
+      // 2. Construct PipeConnectorSubtaskLifeCycle to manage 
PipeConnectorSubtask's life cycle
+      final BoundedBlockingPendingQueue<Event> pendingQueue =
+          new BoundedBlockingPendingQueue<>(
+              PipeConfig.getInstance().getPipeConnectorPendingQueueSize());
+      final PipeConnectorSubtask pipeConnectorSubtask =
+          new PipeConnectorSubtask(attributeSortedString, pendingQueue, 
pipeConnector);
+      final PipeConnectorSubtaskLifeCycle pipeConnectorSubtaskLifeCycle =
+          new PipeConnectorSubtaskLifeCycle(executor, pipeConnectorSubtask, 
pendingQueue);
       attributeSortedString2SubtaskLifeCycleMap.put(
-          attributeSortedString, pipeConnectorSubtaskLifeCycleList);
+          attributeSortedString, pipeConnectorSubtaskLifeCycle);
     }
 
-    for (final PipeConnectorSubtaskLifeCycle lifeCycle :
-        attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString)) {
-      lifeCycle.register();
-    }
+    
attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString).register();
 
     return attributeSortedString;
   }
@@ -129,11 +116,7 @@ public class PipeConnectorSubtaskManager {
       throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE + 
attributeSortedString);
     }
 
-    final List<PipeConnectorSubtaskLifeCycle> lifeCycles =
-        attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString);
-    lifeCycles.removeIf(PipeConnectorSubtaskLifeCycle::deregister);
-
-    if (lifeCycles.isEmpty()) {
+    if 
(attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString).deregister())
 {
       attributeSortedString2SubtaskLifeCycleMap.remove(attributeSortedString);
     }
   }
@@ -143,10 +126,7 @@ public class PipeConnectorSubtaskManager {
       throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE + 
attributeSortedString);
     }
 
-    for (final PipeConnectorSubtaskLifeCycle lifeCycle :
-        attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString)) {
-      lifeCycle.start();
-    }
+    
attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString).start();
   }
 
   public synchronized void stop(String attributeSortedString) {
@@ -154,10 +134,7 @@ public class PipeConnectorSubtaskManager {
       throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE + 
attributeSortedString);
     }
 
-    for (final PipeConnectorSubtaskLifeCycle lifeCycle :
-        attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString)) {
-      lifeCycle.stop();
-    }
+    
attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString).stop();
   }
 
   public BoundedBlockingPendingQueue<Event> getPipeConnectorPendingQueue(
@@ -167,32 +144,13 @@ public class PipeConnectorSubtaskManager {
           "Failed to get PendingQueue. No such subtask: " + 
attributeSortedString);
     }
 
-    // All subtasks share the same pending queue
-    return attributeSortedString2SubtaskLifeCycleMap
-        .get(attributeSortedString)
-        .get(0)
-        .getPendingQueue();
+    return 
attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString).getPendingQueue();
   }
 
   /////////////////////////  Singleton Instance Holder  
/////////////////////////
 
   private PipeConnectorSubtaskManager() {
-    CONNECTOR_CONSTRUCTORS.put(
-        BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName(),
-        IoTDBThriftSyncConnector::new);
-    CONNECTOR_CONSTRUCTORS.put(
-        BuiltinPipePlugin.IOTDB_THRIFT_SYNC_CONNECTOR.getPipePluginName(),
-        IoTDBThriftSyncConnector::new);
-    CONNECTOR_CONSTRUCTORS.put(
-        BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName(),
-        IoTDBThriftAsyncConnector::new);
-    CONNECTOR_CONSTRUCTORS.put(
-        BuiltinPipePlugin.IOTDB_LEGACY_PIPE_CONNECTOR.getPipePluginName(),
-        IoTDBLegacyPipeConnector::new);
-    CONNECTOR_CONSTRUCTORS.put(
-        BuiltinPipePlugin.WEBSOCKET_CONNECTOR.getPipePluginName(), 
WebSocketConnector::new);
-    CONNECTOR_CONSTRUCTORS.put(
-        BuiltinPipePlugin.OPC_UA_CONNECTOR.getPipePluginName(), 
OpcUaConnector::new);
+    // Empty constructor
   }
 
   private static class PipeSubtaskManagerHolder {

Reply via email to