This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new ff78cbd42d [IOTDB-5845] Failed to register PipeRuntimeCoordinator to
loadPublisher in LoadManager (#9882)
ff78cbd42d is described below
commit ff78cbd42d0271ece84dc3e04f8d802f3316ef93
Author: Caideyipi <[email protected]>
AuthorDate: Thu May 18 19:15:32 2023 +0800
[IOTDB-5845] Failed to register PipeRuntimeCoordinator to loadPublisher in
LoadManager (#9882)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../main/java/org/apache/iotdb/confignode/manager/ConfigManager.java | 5 ++++-
.../java/org/apache/iotdb/confignode/manager/load/LoadManager.java | 3 +--
.../apache/iotdb/confignode/manager/pipe/PipeRuntimeCoordinator.java | 5 +++++
.../org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java | 4 ++++
.../java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaKeeper.java | 4 ++++
5 files changed, 18 insertions(+), 3 deletions(-)
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index a6877c1bdc..11d2bd2912 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -289,12 +289,15 @@ public class ConfigManager implements IManager {
this.udfManager = new UDFManager(this, udfInfo);
this.triggerManager = new TriggerManager(this, triggerInfo);
this.cqManager = new CQManager(this);
- this.loadManager = new LoadManager(this);
this.modelManager = new ModelManager(this, modelInfo);
this.pipeManager = new PipeManager(this, pipeInfo);
this.retryFailedTasksThread = new RetryFailedTasksThread(this);
this.clusterQuotaManager = new ClusterQuotaManager(this, quotaInfo);
+
+ // Please keep loadManager initializing at last because it may require
other managers to
+ // register the eventBus
+ this.loadManager = new LoadManager(this);
}
public void initConsensusManager() throws IOException {
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index 58800b4229..d88ebd70fe 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -88,8 +88,7 @@ public class LoadManager {
new StatisticsService(configManager, routeBalancer, loadCache,
loadPublisher);
loadPublisher.register(statisticsService);
- // TODO: enable
- //
loadPublisher.register(configManager.getPipeManager().getPipeRuntimeCoordinator());
+
loadPublisher.register(configManager.getPipeManager().getPipeRuntimeCoordinator());
}
/**
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeRuntimeCoordinator.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeRuntimeCoordinator.java
index c590fe6fcd..9306551313 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeRuntimeCoordinator.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeRuntimeCoordinator.java
@@ -52,6 +52,11 @@ public class PipeRuntimeCoordinator implements
IClusterStatusSubscriber {
@Override
public void onRegionGroupLeaderChanged(RouteChangeEvent event) {
+ // if no pipe task, return
+ if
(configManager.getPipeManager().getPipeTaskCoordinator().getPipeTaskInfo().isEmpty())
{
+ return;
+ }
+
// we only care about data region leader change
final Map<TConsensusGroupId, Pair<Integer, Integer>>
dataRegionGroupToOldAndNewLeaderPairMap =
new HashMap<>();
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index 542e6bc682..528725b132 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -176,6 +176,10 @@ public class PipeTaskInfo implements SnapshotProcessor {
return pipeMetaKeeper.getPipeMetaList();
}
+ public boolean isEmpty() {
+ return pipeMetaKeeper.isEmpty();
+ }
+
/////////////////////////////// Pipe Runtime Management
///////////////////////////////
/** handle the data region leader change event and update the pipe task meta
accordingly */
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaKeeper.java
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaKeeper.java
index d5f0afb918..64b28dd1dd 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaKeeper.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaKeeper.java
@@ -59,6 +59,10 @@ public class PipeMetaKeeper {
this.pipeNameToPipeMetaMap.clear();
}
+ public boolean isEmpty() {
+ return pipeNameToPipeMetaMap.isEmpty();
+ }
+
public void processTakeSnapshot(FileOutputStream fileOutputStream) throws
IOException {
ReadWriteIOUtils.write(pipeNameToPipeMetaMap.size(), fileOutputStream);
for (Map.Entry<String, PipeMeta> entry : pipeNameToPipeMetaMap.entrySet())
{