This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 38b36006b2a [IOTDB-6117] Pipe: Optimize RPC requests from CN to DN. CN
send exactly one pipeMeta to each DN upon create/start/stop/drop pipe (#10875)
38b36006b2a is described below
commit 38b36006b2ae283451d5e2f74324a6fcc5b737d6
Author: 马子坤 <[email protected]>
AuthorDate: Fri Aug 18 13:58:19 2023 +0800
[IOTDB-6117] Pipe: Optimize RPC requests from CN to DN. CN send exactly one
pipeMeta to each DN upon create/start/stop/drop pipe (#10875)
Currently, CN sends pipeMeta of all existing pipes to each dn upon
create/start/stop/drop pipe, which may be time-comsuming.
In this commit, CN will send exactly one pipeMeta to each DN upon
create/start/stop/drop pipe.
---------
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../confignode/client/DataNodeRequestType.java | 3 +-
.../client/async/AsyncDataNodeClientPool.java | 9 +-
.../client/async/handlers/AsyncClientHandler.java | 3 +-
.../confignode/persistence/pipe/PipeTaskInfo.java | 9 ++
.../procedure/env/ConfigNodeProcedureEnv.java | 31 ++++-
.../impl/pipe/AbstractOperatePipeProcedureV2.java | 31 ++++-
.../impl/pipe/task/CreatePipeProcedureV2.java | 9 +-
.../impl/pipe/task/DropPipeProcedureV2.java | 5 +-
.../impl/pipe/task/StartPipeProcedureV2.java | 3 +-
.../impl/pipe/task/StopPipeProcedureV2.java | 3 +-
.../iotdb/db/pipe/agent/task/PipeTaskAgent.java | 131 +++++++++++++++------
.../impl/DataNodeInternalRPCServiceImpl.java | 27 +++++
.../commons/pipe/task/meta/PipeMetaKeeper.java | 4 +
.../src/main/thrift/datanode.thrift | 10 ++
14 files changed, 226 insertions(+), 52 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
index 8fc3ffcbe37..6f5743cf826 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
@@ -67,7 +67,8 @@ public enum DataNodeRequestType {
DROP_PIPE_PLUGIN,
// Pipe Task
- PUSH_PIPE_META,
+ PIPE_PUSH_ALL_META,
+ PIPE_PUSH_SINGLE_META,
PIPE_HEARTBEAT,
// CQ
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
index 8ccba5457ac..7e8e52e6ff7 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
@@ -63,6 +63,7 @@ import
org.apache.iotdb.mpp.rpc.thrift.TInactiveTriggerInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TInvalidateMatchedSchemaCacheReq;
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaReq;
+import org.apache.iotdb.mpp.rpc.thrift.TPushSinglePipeMetaReq;
import org.apache.iotdb.mpp.rpc.thrift.TRegionLeaderChangeReq;
import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
import org.apache.iotdb.mpp.rpc.thrift.TRollbackSchemaBlackListReq;
@@ -227,12 +228,18 @@ public class AsyncDataNodeClientPool {
(AsyncTSStatusRPCHandler)
clientHandler.createAsyncRPCHandler(requestId,
targetDataNode));
break;
- case PUSH_PIPE_META:
+ case PIPE_PUSH_ALL_META:
client.pushPipeMeta(
(TPushPipeMetaReq) clientHandler.getRequest(requestId),
(PipePushMetaRPCHandler)
clientHandler.createAsyncRPCHandler(requestId,
targetDataNode));
break;
+ case PIPE_PUSH_SINGLE_META:
+ client.pushSinglePipeMeta(
+ (TPushSinglePipeMetaReq) clientHandler.getRequest(requestId),
+ (PipePushMetaRPCHandler)
+ clientHandler.createAsyncRPCHandler(requestId,
targetDataNode));
+ break;
case PIPE_HEARTBEAT:
client.pipeHeartbeat(
(TPipeHeartbeatReq) clientHandler.getRequest(requestId),
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java
index 21690788051..7796252199c 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java
@@ -210,7 +210,8 @@ public class AsyncClientHandler<Q, R> {
dataNodeLocationMap,
(Map<Integer, TPipeHeartbeatResp>) responseMap,
countDownLatch);
- case PUSH_PIPE_META:
+ case PIPE_PUSH_ALL_META:
+ case PIPE_PUSH_SINGLE_META:
return new PipePushMetaRPCHandler(
requestType,
requestId,
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index 28ade7c5f5d..7ffeeed3d77 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -273,6 +273,15 @@ public class PipeTaskInfo implements SnapshotProcessor {
}
}
+ public PipeMeta getPipeMetaByPipeName(String pipeName) {
+ acquireReadLock();
+ try {
+ return pipeMetaKeeper.getPipeMetaByPipeName(pipeName);
+ } finally {
+ releaseReadLock();
+ }
+ }
+
public boolean isEmpty() {
acquireReadLock();
try {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index ee5fcccd681..26a94750e62 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -69,6 +69,7 @@ import
org.apache.iotdb.mpp.rpc.thrift.TInactiveTriggerInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TInvalidateCacheReq;
import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaReq;
import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaResp;
+import org.apache.iotdb.mpp.rpc.thrift.TPushSinglePipeMetaReq;
import org.apache.iotdb.mpp.rpc.thrift.TUpdateConfigNodeGroupReq;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.utils.Binary;
@@ -660,14 +661,40 @@ public class ConfigNodeProcedureEnv {
return clientHandler.getResponseList();
}
- public Map<Integer, TPushPipeMetaResp> pushPipeMetaToDataNodes(
+ public Map<Integer, TPushPipeMetaResp> pushAllPipeMetaToDataNodes(
List<ByteBuffer> pipeMetaBinaryList) {
final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
configManager.getNodeManager().getRegisteredDataNodeLocations();
final TPushPipeMetaReq request = new
TPushPipeMetaReq().setPipeMetas(pipeMetaBinaryList);
final AsyncClientHandler<TPushPipeMetaReq, TPushPipeMetaResp>
clientHandler =
- new AsyncClientHandler<>(DataNodeRequestType.PUSH_PIPE_META, request,
dataNodeLocationMap);
+ new AsyncClientHandler<>(
+ DataNodeRequestType.PIPE_PUSH_ALL_META, request,
dataNodeLocationMap);
+
AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+ return clientHandler.getResponseMap();
+ }
+
+ public Map<Integer, TPushPipeMetaResp>
pushSinglePipeMetaToDataNodes(ByteBuffer pipeMetaBinary) {
+ final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+ configManager.getNodeManager().getRegisteredDataNodeLocations();
+ final TPushSinglePipeMetaReq request = new
TPushSinglePipeMetaReq().setPipeMeta(pipeMetaBinary);
+
+ final AsyncClientHandler<TPushSinglePipeMetaReq, TPushPipeMetaResp>
clientHandler =
+ new AsyncClientHandler<>(
+ DataNodeRequestType.PIPE_PUSH_SINGLE_META, request,
dataNodeLocationMap);
+
AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+ return clientHandler.getResponseMap();
+ }
+
+ public Map<Integer, TPushPipeMetaResp> dropSinglePipeOnDataNodes(String
pipeNameToDrop) {
+ final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+ configManager.getNodeManager().getRegisteredDataNodeLocations();
+ final TPushSinglePipeMetaReq request =
+ new TPushSinglePipeMetaReq().setPipeNameToDrop(pipeNameToDrop);
+
+ final AsyncClientHandler<TPushSinglePipeMetaReq, TPushPipeMetaResp>
clientHandler =
+ new AsyncClientHandler<>(
+ DataNodeRequestType.PIPE_PUSH_SINGLE_META, request,
dataNodeLocationMap);
AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
return clientHandler.getResponseMap();
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
index c0ef6d4bfc4..892859fd57e 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
@@ -255,8 +255,7 @@ public abstract class AbstractOperatePipeProcedureV2
}
/**
- * Pushing all the pipeMeta's to all the dataNodes, forcing an update to the
the pipe's runtime
- * state.
+ * Pushing all the pipeMeta's to all the dataNodes, forcing an update to the
pipe's runtime state.
*
* @param env ConfigNodeProcedureEnv
* @return The responseMap after pushing pipe meta
@@ -269,7 +268,7 @@ public abstract class AbstractOperatePipeProcedureV2
pipeMetaBinaryList.add(pipeMeta.serialize());
}
- return env.pushPipeMetaToDataNodes(pipeMetaBinaryList);
+ return env.pushAllPipeMetaToDataNodes(pipeMetaBinaryList);
}
/**
@@ -334,6 +333,32 @@ public abstract class AbstractOperatePipeProcedureV2
}
}
+ /**
+ * Pushing one pipeMeta to all the dataNodes, forcing an update to the
pipe's runtime state.
+ *
+ * @param pipeName pipe name of the pipe to push
+ * @param env ConfigNodeProcedureEnv
+ * @return The responseMap after pushing pipe meta
+ * @throws IOException Exception when Serializing to byte buffer
+ */
+ protected Map<Integer, TPushPipeMetaResp> pushSinglePipeMetaToDataNodes(
+ String pipeName, ConfigNodeProcedureEnv env) throws IOException {
+ return env.pushSinglePipeMetaToDataNodes(
+ pipeTaskInfo.get().getPipeMetaByPipeName(pipeName).serialize());
+ }
+
+ /**
+ * Drop a pipe on all the dataNodes.
+ *
+ * @param pipeName pipe name of the pipe to drop
+ * @param env ConfigNodeProcedureEnv
+ * @return The responseMap after pushing pipe meta
+ */
+ protected Map<Integer, TPushPipeMetaResp> dropSinglePipeOnDataNodes(
+ String pipeName, ConfigNodeProcedureEnv env) {
+ return env.dropSinglePipeOnDataNodes(pipeName);
+ }
+
@Override
public void serialize(DataOutputStream stream) throws IOException {
super.serialize(stream);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
index 5aefe33c222..599637b8282 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
@@ -139,13 +139,11 @@ public class CreatePipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
@Override
protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env)
throws PipeException, IOException {
- LOGGER.info(
- "CreatePipeProcedureV2: executeFromOperateOnDataNodes({})",
- createPipeRequest.getPipeName());
+ final String pipeName = createPipeRequest.getPipeName();
+ LOGGER.info("CreatePipeProcedureV2: executeFromOperateOnDataNodes({})",
pipeName);
String exceptionMessage =
- parsePushPipeMetaExceptionForPipe(
- createPipeRequest.getPipeName(), pushPipeMetaToDataNodes(env));
+ parsePushPipeMetaExceptionForPipe(pipeName,
pushSinglePipeMetaToDataNodes(pipeName, env));
if (!exceptionMessage.isEmpty()) {
throw new PipeException(
String.format(
@@ -190,6 +188,7 @@ public class CreatePipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
"CreatePipeProcedureV2: rollbackFromOperateOnDataNodes({})",
createPipeRequest.getPipeName());
+ // Push all pipe metas to datanode, may be time-consuming
String exceptionMessage =
parsePushPipeMetaExceptionForPipe(
createPipeRequest.getPipeName(), pushPipeMetaToDataNodes(env));
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
index 510d87d2763..ff8cd3e7f3f 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
@@ -81,12 +81,11 @@ public class DropPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
}
@Override
- protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env)
- throws PipeException, IOException {
+ protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env)
throws PipeException {
LOGGER.info("DropPipeProcedureV2: executeFromOperateOnDataNodes({})",
pipeName);
String exceptionMessage =
- parsePushPipeMetaExceptionForPipe(pipeName,
pushPipeMetaToDataNodes(env));
+ parsePushPipeMetaExceptionForPipe(pipeName,
dropSinglePipeOnDataNodes(pipeName, env));
if (!exceptionMessage.isEmpty()) {
throw new PipeException(
String.format("Failed to drop pipe %s, details: %s", pipeName,
exceptionMessage));
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
index eae446df24b..4272def5bc1 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
@@ -89,7 +89,7 @@ public class StartPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
LOGGER.info("StartPipeProcedureV2: executeFromOperateOnDataNodes({})",
pipeName);
String exceptionMessage =
- parsePushPipeMetaExceptionForPipe(pipeName,
pushPipeMetaToDataNodes(env));
+ parsePushPipeMetaExceptionForPipe(pipeName,
pushSinglePipeMetaToDataNodes(pipeName, env));
if (!exceptionMessage.isEmpty()) {
throw new PipeException(
String.format("Failed to start pipe %s, details: %s", pipeName,
exceptionMessage));
@@ -130,6 +130,7 @@ public class StartPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
throws PipeException, IOException {
LOGGER.info("StartPipeProcedureV2: rollbackFromOperateOnDataNodes({})",
pipeName);
+ // Push all pipe metas to datanode, may be time-consuming
String exceptionMessage =
parsePushPipeMetaExceptionForPipe(pipeName,
pushPipeMetaToDataNodes(env));
if (!exceptionMessage.isEmpty()) {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
index a3a85c8364e..cf866b1d27d 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
@@ -89,7 +89,7 @@ public class StopPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
LOGGER.info("StopPipeProcedureV2: executeFromOperateOnDataNodes({})",
pipeName);
String exceptionMessage =
- parsePushPipeMetaExceptionForPipe(pipeName,
pushPipeMetaToDataNodes(env));
+ parsePushPipeMetaExceptionForPipe(pipeName,
pushSinglePipeMetaToDataNodes(pipeName, env));
if (!exceptionMessage.isEmpty()) {
throw new PipeException(
String.format("Failed to stop pipe %s, details: %s", pipeName,
exceptionMessage));
@@ -126,6 +126,7 @@ public class StopPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
throws PipeException, IOException {
LOGGER.info("StopPipeProcedureV2: rollbackFromOperateOnDataNodes({})",
pipeName);
+ // Push all pipe metas to datanode, may be time-consuming
String exceptionMessage =
parsePushPipeMetaExceptionForPipe(pipeName,
pushPipeMetaToDataNodes(env));
if (!exceptionMessage.isEmpty()) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
index da21b757444..e9ecb26d402 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
@@ -117,6 +117,65 @@ public class PipeTaskAgent {
////////////////////////// Pipe Task Management Entry
//////////////////////////
+ public synchronized TPushPipeMetaRespExceptionMessage
handleSinglePipeMetaChanges(
+ PipeMeta pipeMetaFromConfigNode) {
+ acquireWriteLock();
+ try {
+ return handleSinglePipeMetaChangesInternal(pipeMetaFromConfigNode);
+ } finally {
+ releaseWriteLock();
+ }
+ }
+
+ private TPushPipeMetaRespExceptionMessage
handleSinglePipeMetaChangesInternal(
+ PipeMeta pipeMetaFromConfigNode) {
+ // Do nothing if data node is removing or removed
+ if (PipeAgent.runtime().isShutdown()) {
+ return null;
+ }
+
+ try {
+ executeSinglePipeMetaChanges(pipeMetaFromConfigNode);
+ return null;
+ } catch (Exception e) {
+ final String pipeName =
pipeMetaFromConfigNode.getStaticMeta().getPipeName();
+ final String errorMessage =
+ String.format(
+ "Failed to handle single pipe meta changes for %s, because %s",
+ pipeName, e.getMessage());
+ LOGGER.warn("Failed to handle single pipe meta changes for {}",
pipeName, e);
+ return new TPushPipeMetaRespExceptionMessage(
+ pipeName, errorMessage, System.currentTimeMillis());
+ }
+ }
+
+ public synchronized TPushPipeMetaRespExceptionMessage handleDropPipe(String
pipeName) {
+ acquireWriteLock();
+ try {
+ return handleDropPipeInternal(pipeName);
+ } finally {
+ releaseWriteLock();
+ }
+ }
+
+ private TPushPipeMetaRespExceptionMessage handleDropPipeInternal(String
pipeName) {
+ // Do nothing if data node is removing or removed
+ if (PipeAgent.runtime().isShutdown()) {
+ return null;
+ }
+
+ try {
+ dropPipe(pipeName);
+ return null;
+ } catch (Exception e) {
+ final String errorMessage =
+ String.format("Failed to drop pipe %s, because %s", pipeName,
e.getMessage());
+ LOGGER.warn("Failed to drop pipe {}", pipeName, e);
+ return new TPushPipeMetaRespExceptionMessage(
+ pipeName, errorMessage, System.currentTimeMillis());
+ }
+ }
+
public synchronized List<TPushPipeMetaRespExceptionMessage>
handlePipeMetaChanges(
List<PipeMeta> pipeMetaListFromConfigNode) {
acquireWriteLock();
@@ -139,41 +198,10 @@ public class PipeTaskAgent {
// Iterate through pipe meta list from config node, check if pipe meta
exists on data node
// or has changed
for (final PipeMeta metaFromConfigNode : pipeMetaListFromConfigNode) {
- final String pipeName = metaFromConfigNode.getStaticMeta().getPipeName();
-
try {
- final PipeMeta metaOnDataNode = pipeMetaKeeper.getPipeMeta(pipeName);
-
- // If pipe meta does not exist on data node, create a new pipe
- if (metaOnDataNode == null) {
- if (createPipe(metaFromConfigNode)) {
- // If the status recorded in config node is RUNNING, start the pipe
- startPipe(pipeName,
metaFromConfigNode.getStaticMeta().getCreationTime());
- }
- // If the status recorded in config node is STOPPED or DROPPED, do
nothing
- continue;
- }
-
- // If pipe meta exists on data node, check if it has changed
- final PipeStaticMeta staticMetaOnDataNode =
metaOnDataNode.getStaticMeta();
- final PipeStaticMeta staticMetaFromConfigNode =
metaFromConfigNode.getStaticMeta();
-
- // First check if pipe static meta has changed, if so, drop the pipe
and create a new one
- if (!staticMetaOnDataNode.equals(staticMetaFromConfigNode)) {
- dropPipe(pipeName);
- if (createPipe(metaFromConfigNode)) {
- startPipe(pipeName,
metaFromConfigNode.getStaticMeta().getCreationTime());
- }
- // If the status is STOPPED or DROPPED, do nothing
- continue;
- }
-
- // Then check if pipe runtime meta has changed, if so, update the pipe
- final PipeRuntimeMeta runtimeMetaOnDataNode =
metaOnDataNode.getRuntimeMeta();
- final PipeRuntimeMeta runtimeMetaFromConfigNode =
metaFromConfigNode.getRuntimeMeta();
- handlePipeRuntimeMetaChanges(
- staticMetaFromConfigNode, runtimeMetaFromConfigNode,
runtimeMetaOnDataNode);
+ executeSinglePipeMetaChanges(metaFromConfigNode);
} catch (Exception e) {
+ final String pipeName =
metaFromConfigNode.getStaticMeta().getPipeName();
final String errorMessage =
String.format(
"Failed to handle pipe meta changes for %s, because %s",
pipeName, e.getMessage());
@@ -205,7 +233,42 @@ public class PipeTaskAgent {
return exceptionMessages;
}
- private void handlePipeRuntimeMetaChanges(
+ private void executeSinglePipeMetaChanges(final PipeMeta metaFromConfigNode)
{
+ final String pipeName = metaFromConfigNode.getStaticMeta().getPipeName();
+ final PipeMeta metaOnDataNode = pipeMetaKeeper.getPipeMeta(pipeName);
+
+ // If pipe meta does not exist on data node, create a new pipe
+ if (metaOnDataNode == null) {
+ if (createPipe(metaFromConfigNode)) {
+ // If the status recorded in config node is RUNNING, start the pipe
+ startPipe(pipeName,
metaFromConfigNode.getStaticMeta().getCreationTime());
+ }
+ // If the status recorded in config node is STOPPED or DROPPED, do
nothing
+ return;
+ }
+
+ // If pipe meta exists on data node, check if it has changed
+ final PipeStaticMeta staticMetaOnDataNode = metaOnDataNode.getStaticMeta();
+ final PipeStaticMeta staticMetaFromConfigNode =
metaFromConfigNode.getStaticMeta();
+
+ // First check if pipe static meta has changed, if so, drop the pipe and
create a new one
+ if (!staticMetaOnDataNode.equals(staticMetaFromConfigNode)) {
+ dropPipe(pipeName);
+ if (createPipe(metaFromConfigNode)) {
+ startPipe(pipeName,
metaFromConfigNode.getStaticMeta().getCreationTime());
+ }
+ // If the status is STOPPED or DROPPED, do nothing
+ return;
+ }
+
+ // Then check if pipe runtime meta has changed, if so, update the pipe
+ final PipeRuntimeMeta runtimeMetaOnDataNode =
metaOnDataNode.getRuntimeMeta();
+ final PipeRuntimeMeta runtimeMetaFromConfigNode =
metaFromConfigNode.getRuntimeMeta();
+ executeSinglePipeRuntimeMetaChanges(
+ staticMetaFromConfigNode, runtimeMetaFromConfigNode,
runtimeMetaOnDataNode);
+ }
+
+ private void executeSinglePipeRuntimeMetaChanges(
@NotNull PipeStaticMeta pipeStaticMeta,
@NotNull PipeRuntimeMeta runtimeMetaFromConfigNode,
@NotNull PipeRuntimeMeta runtimeMetaOnDataNode) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index ddddd2f783d..86647205bc9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -181,6 +181,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaReq;
import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaResp;
import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaRespExceptionMessage;
+import org.apache.iotdb.mpp.rpc.thrift.TPushSinglePipeMetaReq;
import org.apache.iotdb.mpp.rpc.thrift.TRegionLeaderChangeReq;
import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
import org.apache.iotdb.mpp.rpc.thrift.TRollbackSchemaBlackListReq;
@@ -218,6 +219,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -956,6 +958,31 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
}
}
+ @Override
+ public TPushPipeMetaResp pushSinglePipeMeta(TPushSinglePipeMetaReq req) {
+ try {
+ TPushPipeMetaRespExceptionMessage exceptionMessage;
+ if (req.isSetPipeNameToDrop()) {
+ exceptionMessage =
PipeAgent.task().handleDropPipe(req.getPipeNameToDrop());
+ } else if (req.isSetPipeMeta()) {
+ final PipeMeta pipeMeta =
PipeMeta.deserialize(ByteBuffer.wrap(req.getPipeMeta()));
+ exceptionMessage =
PipeAgent.task().handleSinglePipeMetaChanges(pipeMeta);
+ } else {
+ throw new Exception("Invalid TPushSinglePipeMetaReq");
+ }
+ return exceptionMessage == null
+ ? new TPushPipeMetaResp()
+ .setStatus(new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()))
+ : new TPushPipeMetaResp()
+ .setStatus(new
TSStatus(TSStatusCode.PIPE_PUSH_META_ERROR.getStatusCode()))
+
.setExceptionMessages(Collections.singletonList(exceptionMessage));
+ } catch (Exception e) {
+ LOGGER.error("Error occurred when pushing single pipe meta", e);
+ return new TPushPipeMetaResp()
+ .setStatus(new
TSStatus(TSStatusCode.PIPE_PUSH_META_ERROR.getStatusCode()));
+ }
+ }
+
@Override
public TPipeHeartbeatResp pipeHeartbeat(TPipeHeartbeatReq req) throws
TException {
final TPipeHeartbeatResp resp = new TPipeHeartbeatResp();
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaKeeper.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaKeeper.java
index 85c58ecc873..347ebe78199 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaKeeper.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaKeeper.java
@@ -85,6 +85,10 @@ public class PipeMetaKeeper {
return pipeNameToPipeMetaMap.values();
}
+ public PipeMeta getPipeMetaByPipeName(String pipeName) {
+ return pipeNameToPipeMetaMap.get(pipeName);
+ }
+
public void clear() {
this.pipeNameToPipeMetaMap.clear();
}
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
index 99f2ed1419d..30e1e0d676f 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
@@ -406,6 +406,11 @@ struct TPushPipeMetaRespExceptionMessage {
3: required i64 timeStamp
}
+struct TPushSinglePipeMetaReq {
+ 1: optional binary pipeMeta // Should not set both to null.
+ 2: optional string pipeNameToDrop // If it is not null, pipe with indicated
name on datanode will be dropped.
+}
+
struct TConstructViewSchemaBlackListReq{
1: required list<common.TConsensusGroupId> schemaRegionIdList
2: required binary pathPatternTree
@@ -816,6 +821,11 @@ service IDataNodeRPCService {
*/
TPushPipeMetaResp pushPipeMeta(TPushPipeMetaReq req)
+ /**
+ * Send one pipeMeta to DataNodes, for create/start/stop/drop one pipe
+ */
+ TPushPipeMetaResp pushSinglePipeMeta(TPushSinglePipeMetaReq req)
+
/**
* ConfigNode will ask DataNode for pipe meta in every few seconds
**/