This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch ml_add_peer
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 2f079aa0d199a6922b332c13e0d6d451542f2b49
Author: Jinrui.Zhang <[email protected]>
AuthorDate: Mon Sep 5 19:33:47 2022 +0800

    fix some bugs
---
 .../multileader/MultiLeaderConsensus.java          | 19 +++++++++-------
 .../multileader/MultiLeaderServerImpl.java         | 24 +++++++++++----------
 .../multileader/logdispatcher/IndexController.java | 25 ++++++++++++++++++----
 .../multileader/logdispatcher/LogDispatcher.java   |  7 +++++-
 .../statemachine/DataRegionStateMachine.java       |  6 ++++++
 .../iotdb/db/engine/snapshot/SnapshotLoader.java   |  6 ++++++
 .../SimpleFragmentParallelPlanner.java             |  2 +-
 7 files changed, 64 insertions(+), 25 deletions(-)

diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
index 0d56ed4971..c70d682f29 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
@@ -249,27 +249,27 @@ public class MultiLeaderConsensus implements IConsensus {
     }
     try {
       // step 1: inactive new Peer to prepare for following steps
-      logger.info("inactivate new peer: {}", peer);
+      logger.info("[MultiLeaderConsensus] inactivate new peer: {}", peer);
       impl.inactivePeer(peer);
 
       // step 2: notify all the other Peers to build the sync connection to 
newPeer
-      logger.info("notify current peers to build sync log...");
+      logger.info("[MultiLeaderConsensus] notify current peers to build sync 
log...");
       impl.notifyPeersToBuildSyncLogChannel(peer);
 
       // step 3: take snapshot
-      logger.info("start to take snapshot...");
+      logger.info("[MultiLeaderConsensus] start to take snapshot...");
       impl.takeSnapshot();
 
       // step 4: transit snapshot
-      logger.info("start to transit snapshot...");
+      logger.info("[MultiLeaderConsensus] start to transit snapshot...");
       impl.transitSnapshot(peer);
 
       // step 5: let the new peer load snapshot
-      logger.info("trigger new peer to load snapshot...");
+      logger.info("[MultiLeaderConsensus] trigger new peer to load 
snapshot...");
       impl.triggerSnapshotLoad(peer);
 
       // step 6: active new Peer
-      logger.info("activate new peer...");
+      logger.info("[MultiLeaderConsensus] activate new peer...");
       impl.activePeer(peer);
 
     } catch (ConsensusGroupAddPeerException e) {
@@ -293,9 +293,12 @@ public class MultiLeaderConsensus implements IConsensus {
     try {
       impl.notifyPeersToRemoveSyncLogChannel(peer);
     } catch (ConsensusGroupAddPeerException e) {
-      throw new RuntimeException(e);
+      return ConsensusGenericResponse.newBuilder()
+          .setSuccess(false)
+          .setException(new ConsensusException(e.getMessage()))
+          .build();
     }
-    return ConsensusGenericResponse.newBuilder().setSuccess(false).build();
+    return ConsensusGenericResponse.newBuilder().setSuccess(true).build();
   }
 
   @Override
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
index b260da253b..b4054fb03e 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
@@ -334,11 +334,11 @@ public class MultiLeaderServerImpl {
     // configuration
     List<Peer> currentMembers = new ArrayList<>(this.configuration);
     logger.info(
-        "notify current peers to build sync log. group member: {}, target: {}",
+        "[MultiLeaderConsensus] notify current peers to build sync log. group 
member: {}, target: {}",
         currentMembers,
         targetPeer);
     for (Peer peer : currentMembers) {
-      logger.info("build sync log channel from {}", peer);
+      logger.info("[MultiLeaderConsensus] build sync log channel from {}", 
peer);
       if (peer.equals(thisNode)) {
         // use searchIndex for thisNode as the initialSyncIndex because 
targetPeer will load the
         // snapshot produced by thisNode
@@ -410,24 +410,26 @@ public class MultiLeaderServerImpl {
       throws ConsensusGroupAddPeerException {
     // step 1, build sync channel in LogDispatcher
     logger.info(
-        "start to build sync log channel to {} with initialSyncIndex {}",
+        "[MultiLeaderConsensus] build sync log channel to {} with 
initialSyncIndex {}",
         targetPeer,
         initialSyncIndex);
     logDispatcher.addLogDispatcherThread(targetPeer, initialSyncIndex);
-    logger.info(
-        "[complete] add LogDispatcherThread to {} with initialSyncIndex {}",
-        targetPeer,
-        initialSyncIndex);
     // step 2, update configuration
     configuration.add(targetPeer);
+    logger.info("[MultiLeaderConsensus] persist new configuration: {}", 
configuration);
     persistConfigurationUpdate();
-    logger.info("[complete] persist new configuration: {}", configuration);
   }
 
   public void removeSyncLogChannel(Peer targetPeer) throws 
ConsensusGroupAddPeerException {
-    logDispatcher.removeLogDispatcherThread(targetPeer);
-    configuration.remove(targetPeer);
-    persistConfigurationUpdate();
+    try {
+      logDispatcher.removeLogDispatcherThread(targetPeer);
+      logger.info("[MultiLeaderConsensus] log dispatcher to {} removed and 
cleanup", targetPeer);
+      configuration.remove(targetPeer);
+      persistConfigurationUpdate();
+      logger.info("[MultiLeaderConsensus] configuration updated to {}", 
this.configuration);
+    } catch (IOException e) {
+      throw new ConsensusGroupAddPeerException("error when remove 
LogDispatcherThread", e);
+    }
   }
 
   public void persistConfiguration() {
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexController.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexController.java
index 26f2a9c002..3c65d0c879 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexController.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexController.java
@@ -101,11 +101,17 @@ public class IndexController {
     try {
       if (oldFile.exists()) {
         FileUtils.moveFile(oldFile, newFile);
+        logger.info(
+            "version file updated, previous: {}, current: {}",
+            oldFile.getAbsolutePath(),
+            newFile.getAbsolutePath());
+      } else {
+        // In the normal state, this branch should not be triggered.
+        logger.error(
+            "failed to flush sync index. cannot find previous version file. 
previous: {}",
+            lastFlushedIndex);
       }
-      logger.info(
-          "Version file updated, previous: {}, current: {}",
-          oldFile.getAbsolutePath(),
-          newFile.getAbsolutePath());
+
       lastFlushedIndex = flushIndex;
     } catch (IOException e) {
       logger.error("Error occurred when flushing next version", e);
@@ -143,6 +149,7 @@ public class IndexController {
       versionFile = new File(directory, prefix + initialIndex);
       try {
         Files.createFile(versionFile.toPath());
+        lastFlushedIndex = initialIndex;
       } catch (IOException e) {
         // TODO: (xingtanzjr) we need to handle the situation that file 
creation failed.
         //  Or the dispatcher won't run correctly
@@ -150,4 +157,14 @@ public class IndexController {
       }
     }
   }
+
+  public void cleanupVersionFiles() throws IOException {
+    File directory = new File(storageDir);
+    File[] versionFiles = directory.listFiles((dir, name) -> 
name.startsWith(prefix));
+    if (versionFiles != null && versionFiles.length > 0) {
+      for (File versionFile : versionFiles) {
+        Files.delete(versionFile.toPath());
+      }
+    }
+  }
 }
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index 1958627801..c5491cb702 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -118,7 +118,7 @@ public class LogDispatcher {
     executorService.submit(thread);
   }
 
-  public synchronized void removeLogDispatcherThread(Peer peer) {
+  public synchronized void removeLogDispatcherThread(Peer peer) throws 
IOException {
     if (stopped) {
       return;
     }
@@ -133,6 +133,7 @@ public class LogDispatcher {
       return;
     }
     threads.get(threadIndex).stop();
+    threads.get(threadIndex).cleanup();
     threads.remove(threadIndex);
   }
 
@@ -222,6 +223,10 @@ public class LogDispatcher {
       stopped = true;
     }
 
+    public void cleanup() throws IOException {
+      this.controller.cleanupVersionFiles();
+    }
+
     public boolean isStopped() {
       return stopped;
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
 
b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index c53bb069a7..bd0218d92e 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -30,6 +30,9 @@ import 
org.apache.iotdb.consensus.multileader.wal.GetConsensusReqReaderPlan;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.consensus.statemachine.visitor.DataExecutionVisitor;
 import org.apache.iotdb.db.engine.StorageEngineV2;
+import org.apache.iotdb.db.engine.cache.BloomFilterCache;
+import org.apache.iotdb.db.engine.cache.ChunkCache;
+import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
 import org.apache.iotdb.db.engine.snapshot.SnapshotLoader;
 import org.apache.iotdb.db.engine.snapshot.SnapshotTaker;
 import org.apache.iotdb.db.engine.storagegroup.DataRegion;
@@ -124,6 +127,9 @@ public class DataRegionStateMachine extends 
BaseStateMachine {
     try {
       StorageEngineV2.getInstance()
           .setDataRegion(new 
DataRegionId(Integer.parseInt(region.getDataRegionId())), region);
+      ChunkCache.getInstance().clear();
+      TimeSeriesMetadataCache.getInstance().clear();
+      BloomFilterCache.getInstance().clear();
     } catch (Exception e) {
       logger.error("Exception occurs when replacing data region in storage 
engine.", e);
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotLoader.java 
b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotLoader.java
index cff5886f38..8c96b2b693 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotLoader.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotLoader.java
@@ -115,6 +115,12 @@ public class SnapshotLoader {
   private DataRegion loadSnapshotWithoutLog() {
     try {
       LOGGER.info("Moving snapshot file to data dirs");
+      try {
+        deleteAllFilesInDataDirs();
+        LOGGER.info("Remove all data files in original data dir");
+      } catch (IOException e) {
+        return null;
+      }
       createLinksFromSnapshotDirToDataDirWithoutLog(new File(snapshotPath));
       return loadSnapshot();
     } catch (IOException | DiskSpaceInsufficientException e) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
index d5c73f6822..800d1447d8 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
@@ -166,7 +166,7 @@ public class SimpleFragmentParallelPlanner implements 
IFragmentParallelPlaner {
 
   private boolean isAvailableDataNode(TDataNodeLocation dataNodeLocation) {
     for (TEndPoint endPoint : queryContext.getEndPointBlackList()) {
-      if (endPoint.getIp().equals(dataNodeLocation.internalEndPoint.getIp())) {
+      if (endPoint.equals(dataNodeLocation.internalEndPoint)) {
         return false;
       }
     }

Reply via email to