This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new ff78cbd42d [IOTDB-5845] Failed to register PipeRuntimeCoordinator to 
loadPublisher in LoadManager (#9882)
ff78cbd42d is described below

commit ff78cbd42d0271ece84dc3e04f8d802f3316ef93
Author: Caideyipi <[email protected]>
AuthorDate: Thu May 18 19:15:32 2023 +0800

    [IOTDB-5845] Failed to register PipeRuntimeCoordinator to loadPublisher in 
LoadManager (#9882)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../main/java/org/apache/iotdb/confignode/manager/ConfigManager.java | 5 ++++-
 .../java/org/apache/iotdb/confignode/manager/load/LoadManager.java   | 3 +--
 .../apache/iotdb/confignode/manager/pipe/PipeRuntimeCoordinator.java | 5 +++++
 .../org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java   | 4 ++++
 .../java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaKeeper.java | 4 ++++
 5 files changed, 18 insertions(+), 3 deletions(-)

diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index a6877c1bdc..11d2bd2912 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -289,12 +289,15 @@ public class ConfigManager implements IManager {
     this.udfManager = new UDFManager(this, udfInfo);
     this.triggerManager = new TriggerManager(this, triggerInfo);
     this.cqManager = new CQManager(this);
-    this.loadManager = new LoadManager(this);
     this.modelManager = new ModelManager(this, modelInfo);
     this.pipeManager = new PipeManager(this, pipeInfo);
 
     this.retryFailedTasksThread = new RetryFailedTasksThread(this);
     this.clusterQuotaManager = new ClusterQuotaManager(this, quotaInfo);
+
+    // Please keep loadManager initializing at last because it may require 
other managers to
+    // register the eventBus
+    this.loadManager = new LoadManager(this);
   }
 
   public void initConsensusManager() throws IOException {
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index 58800b4229..d88ebd70fe 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -88,8 +88,7 @@ public class LoadManager {
         new StatisticsService(configManager, routeBalancer, loadCache, 
loadPublisher);
 
     loadPublisher.register(statisticsService);
-    // TODO: enable
-    // 
loadPublisher.register(configManager.getPipeManager().getPipeRuntimeCoordinator());
+    
loadPublisher.register(configManager.getPipeManager().getPipeRuntimeCoordinator());
   }
 
   /**
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeRuntimeCoordinator.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeRuntimeCoordinator.java
index c590fe6fcd..9306551313 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeRuntimeCoordinator.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeRuntimeCoordinator.java
@@ -52,6 +52,11 @@ public class PipeRuntimeCoordinator implements 
IClusterStatusSubscriber {
 
   @Override
   public void onRegionGroupLeaderChanged(RouteChangeEvent event) {
+    // if no pipe task, return
+    if 
(configManager.getPipeManager().getPipeTaskCoordinator().getPipeTaskInfo().isEmpty())
 {
+      return;
+    }
+
     // we only care about data region leader change
     final Map<TConsensusGroupId, Pair<Integer, Integer>> 
dataRegionGroupToOldAndNewLeaderPairMap =
         new HashMap<>();
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index 542e6bc682..528725b132 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -176,6 +176,10 @@ public class PipeTaskInfo implements SnapshotProcessor {
     return pipeMetaKeeper.getPipeMetaList();
   }
 
+  public boolean isEmpty() {
+    return pipeMetaKeeper.isEmpty();
+  }
+
   /////////////////////////////// Pipe Runtime Management 
///////////////////////////////
 
   /** handle the data region leader change event and update the pipe task meta 
accordingly */
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaKeeper.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaKeeper.java
index d5f0afb918..64b28dd1dd 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaKeeper.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaKeeper.java
@@ -59,6 +59,10 @@ public class PipeMetaKeeper {
     this.pipeNameToPipeMetaMap.clear();
   }
 
+  public boolean isEmpty() {
+    return pipeNameToPipeMetaMap.isEmpty();
+  }
+
   public void processTakeSnapshot(FileOutputStream fileOutputStream) throws 
IOException {
     ReadWriteIOUtils.write(pipeNameToPipeMetaMap.size(), fileOutputStream);
     for (Map.Entry<String, PipeMeta> entry : pipeNameToPipeMetaMap.entrySet()) 
{

Reply via email to