This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch rc/1.2.2 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 3aedfafe89ac8849fc9a581edd9addc9597cff6e Author: Steve Yurong Su <[email protected]> AuthorDate: Wed Sep 27 09:54:43 2023 +0800 Revert "[To rel/1.2][IOTDB-6143] Pipe: Support PipeConnector subtasks with the same parameters concurrently scheduling (#11185)" This reverts commit ab2e107b246686949c2313fbc914f256e2392042. --- .../config/constant/PipeConnectorConstant.java | 6 - .../connector/protocol/opcua/OpcUaConnector.java | 72 +++-------- .../protocol/websocket/WebSocketConnector.java | 56 ++------- .../connector/PipeConnectorSubtaskManager.java | 132 +++++++-------------- 4 files changed, 72 insertions(+), 194 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java index 3414b7a9b29..8135ae577ad 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java @@ -19,8 +19,6 @@ package org.apache.iotdb.db.pipe.config.constant; -import org.apache.iotdb.commons.pipe.config.PipeConfig; - import static org.apache.iotdb.commons.conf.IoTDBConstant.MB; public class PipeConnectorConstant { @@ -31,10 +29,6 @@ public class PipeConnectorConstant { public static final String CONNECTOR_IOTDB_PORT_KEY = "connector.port"; public static final String CONNECTOR_IOTDB_NODE_URLS_KEY = "connector.node-urls"; - public static final String CONNECTOR_IOTDB_PARALLEL_TASKS_KEY = "connector.parallel.tasks"; - public static final int CONNECTOR_IOTDB_PARALLEL_TASKS_DEFAULT_VALUE = - PipeConfig.getInstance().getPipeSubtaskExecutorMaxThreadNum(); - public static final String CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY = "connector.batch.enable"; public static final boolean CONNECTOR_IOTDB_BATCH_MODE_ENABLE_DEFAULT_VALUE = true; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java index bc86efb6ec1..703e4652d99 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java @@ -28,11 +28,9 @@ import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; -import org.apache.iotdb.pipe.api.exception.PipeException; import org.apache.iotdb.tsfile.common.constant.TsFileConstant; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.utils.Binary; -import org.apache.iotdb.tsfile.utils.Pair; import org.apache.iotdb.tsfile.write.record.Tablet; import org.eclipse.milo.opcua.sdk.server.OpcUaServer; @@ -45,10 +43,7 @@ import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Map; import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE; import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PASSWORD_KEY; @@ -68,10 +63,6 @@ public class OpcUaConnector implements PipeConnector { private static final Logger LOGGER = LoggerFactory.getLogger(OpcUaConnector.class); - private static final Map<String, Pair<AtomicInteger, OpcUaServer>> - SERVER_KEY_TO_REFERENCE_COUNT_AND_SERVER_MAP = new ConcurrentHashMap<>(); - - private String serverKey; private OpcUaServer server; @Override @@ -95,31 +86,14 @@ public class OpcUaConnector implements PipeConnector { parameters.getStringOrDefault( CONNECTOR_IOTDB_PASSWORD_KEY, CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE); - synchronized (SERVER_KEY_TO_REFERENCE_COUNT_AND_SERVER_MAP) { - serverKey = httpsBindPort + ":" + tcpBindPort; - - server = - SERVER_KEY_TO_REFERENCE_COUNT_AND_SERVER_MAP - .computeIfAbsent( - serverKey, - key -> { - try { - final OpcUaServer newServer = - new OpcUaServerBuilder() - .setTcpBindPort(tcpBindPort) - .setHttpsBindPort(httpsBindPort) - .setUser(user) - .setPassword(password) - .build(); - newServer.startup(); - return new Pair<>(new AtomicInteger(0), newServer); - } catch (Exception e) { - throw new PipeException("Failed to build and startup OpcUaServer", e); - } - }) - .getRight(); - SERVER_KEY_TO_REFERENCE_COUNT_AND_SERVER_MAP.get(serverKey).getLeft().incrementAndGet(); - } + server = + new OpcUaServerBuilder() + .setTcpBindPort(tcpBindPort) + .setHttpsBindPort(httpsBindPort) + .setUser(user) + .setPassword(password) + .build(); + server.startup(); } @Override @@ -132,11 +106,6 @@ public class OpcUaConnector implements PipeConnector { // Server side, do nothing } - @Override - public void transfer(Event event) throws Exception { - // Do nothing when receive heartbeat or other events - } - @Override public void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception { // PipeProcessor can change the type of TabletInsertionEvent @@ -266,25 +235,12 @@ public class OpcUaConnector implements PipeConnector { } @Override - public void close() throws Exception { - if (serverKey == null) { - return; - } - - synchronized (SERVER_KEY_TO_REFERENCE_COUNT_AND_SERVER_MAP) { - final Pair<AtomicInteger, OpcUaServer> pair = - SERVER_KEY_TO_REFERENCE_COUNT_AND_SERVER_MAP.get(serverKey); - if (pair == null) { - return; - } + public void transfer(Event event) throws Exception { + // Do nothing when receive heartbeat or other events + } - if (pair.getLeft().decrementAndGet() <= 0) { - try { - pair.getRight().shutdown(); - } finally { - SERVER_KEY_TO_REFERENCE_COUNT_AND_SERVER_MAP.remove(serverKey); - } - } - } + @Override + public void close() throws Exception { + server.shutdown(); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java index 9ee87b1df54..6df1bee24e2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java @@ -39,21 +39,15 @@ import javax.annotation.Nullable; import java.net.InetSocketAddress; import java.util.Comparator; -import java.util.Map; import java.util.Optional; import java.util.PriorityQueue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; public class WebSocketConnector implements PipeConnector { private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketConnector.class); - - private static final Map<Integer, Pair<AtomicInteger, WebSocketConnectorServer>> - PORT_TO_REFERENCE_COUNT_AND_SERVER_MAP = new ConcurrentHashMap<>(); - - private Integer port; - private WebSocketConnectorServer server; + private final AtomicReference<WebSocketConnectorServer> server = new AtomicReference<>(); + private int port; public final AtomicLong commitIdGenerator = new AtomicLong(0); private final AtomicLong lastCommitId = new AtomicLong(0); @@ -74,19 +68,13 @@ public class WebSocketConnector implements PipeConnector { @Override public void handshake() throws Exception { - synchronized (PORT_TO_REFERENCE_COUNT_AND_SERVER_MAP) { - server = - PORT_TO_REFERENCE_COUNT_AND_SERVER_MAP - .computeIfAbsent( - port, - key -> { - final WebSocketConnectorServer newServer = - new WebSocketConnectorServer(new InetSocketAddress(port), this); - newServer.start(); - return new Pair<>(new AtomicInteger(0), newServer); - }) - .getRight(); - PORT_TO_REFERENCE_COUNT_AND_SERVER_MAP.get(port).getLeft().incrementAndGet(); + if (server.get() == null) { + synchronized (server) { + if (server.get() == null) { + server.set(new WebSocketConnectorServer(new InetSocketAddress(port), this)); + server.get().start(); + } + } } } @@ -106,7 +94,7 @@ public class WebSocketConnector implements PipeConnector { long commitId = commitIdGenerator.incrementAndGet(); ((EnrichedEvent) tabletInsertionEvent) .increaseReferenceCount(WebSocketConnector.class.getName()); - server.addEvent(new Pair<>(commitId, tabletInsertionEvent)); + server.get().addEvent(new Pair<>(commitId, tabletInsertionEvent)); } @Override @@ -121,7 +109,7 @@ public class WebSocketConnector implements PipeConnector { for (TabletInsertionEvent event : tsFileInsertionEvent.toTabletInsertionEvents()) { long commitId = commitIdGenerator.incrementAndGet(); ((EnrichedEvent) event).increaseReferenceCount(WebSocketConnector.class.getName()); - server.addEvent(new Pair<>(commitId, event)); + server.get().addEvent(new Pair<>(commitId, event)); } } finally { tsFileInsertionEvent.close(); @@ -133,25 +121,7 @@ public class WebSocketConnector implements PipeConnector { @Override public void close() throws Exception { - if (port == null) { - return; - } - - synchronized (PORT_TO_REFERENCE_COUNT_AND_SERVER_MAP) { - final Pair<AtomicInteger, WebSocketConnectorServer> pair = - PORT_TO_REFERENCE_COUNT_AND_SERVER_MAP.get(port); - if (pair == null) { - return; - } - - if (pair.getLeft().decrementAndGet() <= 0) { - try { - pair.getRight().stop(); - } finally { - PORT_TO_REFERENCE_COUNT_AND_SERVER_MAP.remove(port); - } - } - } + server.get().stop(); } public synchronized void commit(long requestCommitId, @Nullable EnrichedEvent enrichedEvent) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java index a6c0c59dac5..9fde95da3fc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java @@ -38,22 +38,16 @@ import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.exception.PipeException; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.TreeMap; -import java.util.function.Supplier; public class PipeConnectorSubtaskManager { - private static final Map<String, Supplier<PipeConnector>> CONNECTOR_CONSTRUCTORS = - new HashMap<>(); - private static final String FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE = "Failed to deregister PipeConnectorSubtask. No such subtask: "; - private final Map<String, List<PipeConnectorSubtaskLifeCycle>> + private final Map<String, PipeConnectorSubtaskLifeCycle> attributeSortedString2SubtaskLifeCycleMap = new HashMap<>(); public synchronized String register( @@ -64,62 +58,55 @@ public class PipeConnectorSubtaskManager { new TreeMap<>(pipeConnectorParameters.getAttribute()).toString(); if (!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) { - final int connectorNum = - pipeConnectorParameters.getIntOrDefault( - PipeConnectorConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_KEY, - PipeConnectorConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_DEFAULT_VALUE); - final List<PipeConnectorSubtaskLifeCycle> pipeConnectorSubtaskLifeCycleList = - new ArrayList<>(connectorNum); - + // 1. Construct, validate and customize PipeConnector, and then handshake (create connection) + // with the target final String connectorKey = pipeConnectorParameters.getStringOrDefault( PipeConnectorConstant.CONNECTOR_KEY, BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName()); - // Shared pending queue for all subtasks - final BoundedBlockingPendingQueue<Event> pendingQueue = - new BoundedBlockingPendingQueue<>( - PipeConfig.getInstance().getPipeConnectorPendingQueueSize()); - for (int i = 0; i < connectorNum; i++) { - final PipeConnector pipeConnector = - CONNECTOR_CONSTRUCTORS - .getOrDefault( - connectorKey, - () -> PipeAgent.plugin().reflectConnector(pipeConnectorParameters)) - .get(); - - // 1. Construct, validate and customize PipeConnector, and then handshake (create - // connection) with the target - try { - pipeConnector.validate(new PipeParameterValidator(pipeConnectorParameters)); - pipeConnector.customize( - pipeConnectorParameters, new PipeTaskRuntimeConfiguration(pipeRuntimeEnvironment)); - pipeConnector.handshake(); - } catch (Exception e) { - throw new PipeException( - "Failed to construct PipeConnector, because of " + e.getMessage(), e); - } - - // 2. Construct PipeConnectorSubtaskLifeCycle to manage PipeConnectorSubtask's life cycle - final PipeConnectorSubtask pipeConnectorSubtask = - new PipeConnectorSubtask( - String.format( - "%s_%s_%s", attributeSortedString, pipeRuntimeEnvironment.getCreationTime(), i), - pendingQueue, - pipeConnector); - final PipeConnectorSubtaskLifeCycle pipeConnectorSubtaskLifeCycle = - new PipeConnectorSubtaskLifeCycle(executor, pipeConnectorSubtask, pendingQueue); - pipeConnectorSubtaskLifeCycleList.add(pipeConnectorSubtaskLifeCycle); + PipeConnector pipeConnector; + if (connectorKey.equals(BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName()) + || connectorKey.equals( + BuiltinPipePlugin.IOTDB_THRIFT_SYNC_CONNECTOR.getPipePluginName())) { + pipeConnector = new IoTDBThriftSyncConnector(); + } else if (connectorKey.equals( + BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName())) { + pipeConnector = new IoTDBThriftAsyncConnector(); + } else if (connectorKey.equals( + BuiltinPipePlugin.IOTDB_LEGACY_PIPE_CONNECTOR.getPipePluginName())) { + pipeConnector = new IoTDBLegacyPipeConnector(); + } else if (connectorKey.equals(BuiltinPipePlugin.OPC_UA_CONNECTOR.getPipePluginName())) { + pipeConnector = new OpcUaConnector(); + } else if (connectorKey.equals(BuiltinPipePlugin.WEBSOCKET_CONNECTOR.getPipePluginName())) { + pipeConnector = new WebSocketConnector(); + } else { + pipeConnector = PipeAgent.plugin().reflectConnector(pipeConnectorParameters); + } + + try { + pipeConnector.validate(new PipeParameterValidator(pipeConnectorParameters)); + pipeConnector.customize( + pipeConnectorParameters, new PipeTaskRuntimeConfiguration(pipeRuntimeEnvironment)); + pipeConnector.handshake(); + } catch (Exception e) { + throw new PipeException( + "Failed to construct PipeConnector, because of " + e.getMessage(), e); } + // 2. Construct PipeConnectorSubtaskLifeCycle to manage PipeConnectorSubtask's life cycle + final BoundedBlockingPendingQueue<Event> pendingQueue = + new BoundedBlockingPendingQueue<>( + PipeConfig.getInstance().getPipeConnectorPendingQueueSize()); + final PipeConnectorSubtask pipeConnectorSubtask = + new PipeConnectorSubtask(attributeSortedString, pendingQueue, pipeConnector); + final PipeConnectorSubtaskLifeCycle pipeConnectorSubtaskLifeCycle = + new PipeConnectorSubtaskLifeCycle(executor, pipeConnectorSubtask, pendingQueue); attributeSortedString2SubtaskLifeCycleMap.put( - attributeSortedString, pipeConnectorSubtaskLifeCycleList); + attributeSortedString, pipeConnectorSubtaskLifeCycle); } - for (final PipeConnectorSubtaskLifeCycle lifeCycle : - attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString)) { - lifeCycle.register(); - } + attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString).register(); return attributeSortedString; } @@ -129,11 +116,7 @@ public class PipeConnectorSubtaskManager { throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE + attributeSortedString); } - final List<PipeConnectorSubtaskLifeCycle> lifeCycles = - attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString); - lifeCycles.removeIf(PipeConnectorSubtaskLifeCycle::deregister); - - if (lifeCycles.isEmpty()) { + if (attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString).deregister()) { attributeSortedString2SubtaskLifeCycleMap.remove(attributeSortedString); } } @@ -143,10 +126,7 @@ public class PipeConnectorSubtaskManager { throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE + attributeSortedString); } - for (final PipeConnectorSubtaskLifeCycle lifeCycle : - attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString)) { - lifeCycle.start(); - } + attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString).start(); } public synchronized void stop(String attributeSortedString) { @@ -154,10 +134,7 @@ public class PipeConnectorSubtaskManager { throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE + attributeSortedString); } - for (final PipeConnectorSubtaskLifeCycle lifeCycle : - attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString)) { - lifeCycle.stop(); - } + attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString).stop(); } public BoundedBlockingPendingQueue<Event> getPipeConnectorPendingQueue( @@ -167,32 +144,13 @@ public class PipeConnectorSubtaskManager { "Failed to get PendingQueue. No such subtask: " + attributeSortedString); } - // All subtasks share the same pending queue - return attributeSortedString2SubtaskLifeCycleMap - .get(attributeSortedString) - .get(0) - .getPendingQueue(); + return attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString).getPendingQueue(); } ///////////////////////// Singleton Instance Holder ///////////////////////// private PipeConnectorSubtaskManager() { - CONNECTOR_CONSTRUCTORS.put( - BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName(), - IoTDBThriftSyncConnector::new); - CONNECTOR_CONSTRUCTORS.put( - BuiltinPipePlugin.IOTDB_THRIFT_SYNC_CONNECTOR.getPipePluginName(), - IoTDBThriftSyncConnector::new); - CONNECTOR_CONSTRUCTORS.put( - BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName(), - IoTDBThriftAsyncConnector::new); - CONNECTOR_CONSTRUCTORS.put( - BuiltinPipePlugin.IOTDB_LEGACY_PIPE_CONNECTOR.getPipePluginName(), - IoTDBLegacyPipeConnector::new); - CONNECTOR_CONSTRUCTORS.put( - BuiltinPipePlugin.WEBSOCKET_CONNECTOR.getPipePluginName(), WebSocketConnector::new); - CONNECTOR_CONSTRUCTORS.put( - BuiltinPipePlugin.OPC_UA_CONNECTOR.getPipePluginName(), OpcUaConnector::new); + // Empty constructor } private static class PipeSubtaskManagerHolder {
