This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch ml_add_peer in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 2f079aa0d199a6922b332c13e0d6d451542f2b49 Author: Jinrui.Zhang <[email protected]> AuthorDate: Mon Sep 5 19:33:47 2022 +0800 fix some bugs --- .../multileader/MultiLeaderConsensus.java | 19 +++++++++------- .../multileader/MultiLeaderServerImpl.java | 24 +++++++++++---------- .../multileader/logdispatcher/IndexController.java | 25 ++++++++++++++++++---- .../multileader/logdispatcher/LogDispatcher.java | 7 +++++- .../statemachine/DataRegionStateMachine.java | 6 ++++++ .../iotdb/db/engine/snapshot/SnapshotLoader.java | 6 ++++++ .../SimpleFragmentParallelPlanner.java | 2 +- 7 files changed, 64 insertions(+), 25 deletions(-) diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java index 0d56ed4971..c70d682f29 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java @@ -249,27 +249,27 @@ public class MultiLeaderConsensus implements IConsensus { } try { // step 1: inactive new Peer to prepare for following steps - logger.info("inactivate new peer: {}", peer); + logger.info("[MultiLeaderConsensus] inactivate new peer: {}", peer); impl.inactivePeer(peer); // step 2: notify all the other Peers to build the sync connection to newPeer - logger.info("notify current peers to build sync log..."); + logger.info("[MultiLeaderConsensus] notify current peers to build sync log..."); impl.notifyPeersToBuildSyncLogChannel(peer); // step 3: take snapshot - logger.info("start to take snapshot..."); + logger.info("[MultiLeaderConsensus] start to take snapshot..."); impl.takeSnapshot(); // step 4: transit snapshot - logger.info("start to transit snapshot..."); + logger.info("[MultiLeaderConsensus] start to transit snapshot..."); impl.transitSnapshot(peer); // step 5: let the new peer load snapshot - logger.info("trigger new peer to load snapshot..."); + logger.info("[MultiLeaderConsensus] trigger new peer to load snapshot..."); impl.triggerSnapshotLoad(peer); // step 6: active new Peer - logger.info("activate new peer..."); + logger.info("[MultiLeaderConsensus] activate new peer..."); impl.activePeer(peer); } catch (ConsensusGroupAddPeerException e) { @@ -293,9 +293,12 @@ public class MultiLeaderConsensus implements IConsensus { try { impl.notifyPeersToRemoveSyncLogChannel(peer); } catch (ConsensusGroupAddPeerException e) { - throw new RuntimeException(e); + return ConsensusGenericResponse.newBuilder() + .setSuccess(false) + .setException(new ConsensusException(e.getMessage())) + .build(); } - return ConsensusGenericResponse.newBuilder().setSuccess(false).build(); + return ConsensusGenericResponse.newBuilder().setSuccess(true).build(); } @Override diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java index b260da253b..b4054fb03e 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java @@ -334,11 +334,11 @@ public class MultiLeaderServerImpl { // configuration List<Peer> currentMembers = new ArrayList<>(this.configuration); logger.info( - "notify current peers to build sync log. group member: {}, target: {}", + "[MultiLeaderConsensus] notify current peers to build sync log. group member: {}, target: {}", currentMembers, targetPeer); for (Peer peer : currentMembers) { - logger.info("build sync log channel from {}", peer); + logger.info("[MultiLeaderConsensus] build sync log channel from {}", peer); if (peer.equals(thisNode)) { // use searchIndex for thisNode as the initialSyncIndex because targetPeer will load the // snapshot produced by thisNode @@ -410,24 +410,26 @@ public class MultiLeaderServerImpl { throws ConsensusGroupAddPeerException { // step 1, build sync channel in LogDispatcher logger.info( - "start to build sync log channel to {} with initialSyncIndex {}", + "[MultiLeaderConsensus] build sync log channel to {} with initialSyncIndex {}", targetPeer, initialSyncIndex); logDispatcher.addLogDispatcherThread(targetPeer, initialSyncIndex); - logger.info( - "[complete] add LogDispatcherThread to {} with initialSyncIndex {}", - targetPeer, - initialSyncIndex); // step 2, update configuration configuration.add(targetPeer); + logger.info("[MultiLeaderConsensus] persist new configuration: {}", configuration); persistConfigurationUpdate(); - logger.info("[complete] persist new configuration: {}", configuration); } public void removeSyncLogChannel(Peer targetPeer) throws ConsensusGroupAddPeerException { - logDispatcher.removeLogDispatcherThread(targetPeer); - configuration.remove(targetPeer); - persistConfigurationUpdate(); + try { + logDispatcher.removeLogDispatcherThread(targetPeer); + logger.info("[MultiLeaderConsensus] log dispatcher to {} removed and cleanup", targetPeer); + configuration.remove(targetPeer); + persistConfigurationUpdate(); + logger.info("[MultiLeaderConsensus] configuration updated to {}", this.configuration); + } catch (IOException e) { + throw new ConsensusGroupAddPeerException("error when remove LogDispatcherThread", e); + } } public void persistConfiguration() { diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexController.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexController.java index 26f2a9c002..3c65d0c879 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexController.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexController.java @@ -101,11 +101,17 @@ public class IndexController { try { if (oldFile.exists()) { FileUtils.moveFile(oldFile, newFile); + logger.info( + "version file updated, previous: {}, current: {}", + oldFile.getAbsolutePath(), + newFile.getAbsolutePath()); + } else { + // In the normal state, this branch should not be triggered. + logger.error( + "failed to flush sync index. cannot find previous version file. previous: {}", + lastFlushedIndex); } - logger.info( - "Version file updated, previous: {}, current: {}", - oldFile.getAbsolutePath(), - newFile.getAbsolutePath()); + lastFlushedIndex = flushIndex; } catch (IOException e) { logger.error("Error occurred when flushing next version", e); @@ -143,6 +149,7 @@ public class IndexController { versionFile = new File(directory, prefix + initialIndex); try { Files.createFile(versionFile.toPath()); + lastFlushedIndex = initialIndex; } catch (IOException e) { // TODO: (xingtanzjr) we need to handle the situation that file creation failed. // Or the dispatcher won't run correctly @@ -150,4 +157,14 @@ public class IndexController { } } } + + public void cleanupVersionFiles() throws IOException { + File directory = new File(storageDir); + File[] versionFiles = directory.listFiles((dir, name) -> name.startsWith(prefix)); + if (versionFiles != null && versionFiles.length > 0) { + for (File versionFile : versionFiles) { + Files.delete(versionFile.toPath()); + } + } + } } diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java index 1958627801..c5491cb702 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java @@ -118,7 +118,7 @@ public class LogDispatcher { executorService.submit(thread); } - public synchronized void removeLogDispatcherThread(Peer peer) { + public synchronized void removeLogDispatcherThread(Peer peer) throws IOException { if (stopped) { return; } @@ -133,6 +133,7 @@ public class LogDispatcher { return; } threads.get(threadIndex).stop(); + threads.get(threadIndex).cleanup(); threads.remove(threadIndex); } @@ -222,6 +223,10 @@ public class LogDispatcher { stopped = true; } + public void cleanup() throws IOException { + this.controller.cleanupVersionFiles(); + } + public boolean isStopped() { return stopped; } diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java index c53bb069a7..bd0218d92e 100644 --- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java +++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java @@ -30,6 +30,9 @@ import org.apache.iotdb.consensus.multileader.wal.GetConsensusReqReaderPlan; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.consensus.statemachine.visitor.DataExecutionVisitor; import org.apache.iotdb.db.engine.StorageEngineV2; +import org.apache.iotdb.db.engine.cache.BloomFilterCache; +import org.apache.iotdb.db.engine.cache.ChunkCache; +import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache; import org.apache.iotdb.db.engine.snapshot.SnapshotLoader; import org.apache.iotdb.db.engine.snapshot.SnapshotTaker; import org.apache.iotdb.db.engine.storagegroup.DataRegion; @@ -124,6 +127,9 @@ public class DataRegionStateMachine extends BaseStateMachine { try { StorageEngineV2.getInstance() .setDataRegion(new DataRegionId(Integer.parseInt(region.getDataRegionId())), region); + ChunkCache.getInstance().clear(); + TimeSeriesMetadataCache.getInstance().clear(); + BloomFilterCache.getInstance().clear(); } catch (Exception e) { logger.error("Exception occurs when replacing data region in storage engine.", e); } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotLoader.java b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotLoader.java index cff5886f38..8c96b2b693 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotLoader.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotLoader.java @@ -115,6 +115,12 @@ public class SnapshotLoader { private DataRegion loadSnapshotWithoutLog() { try { LOGGER.info("Moving snapshot file to data dirs"); + try { + deleteAllFilesInDataDirs(); + LOGGER.info("Remove all data files in original data dir"); + } catch (IOException e) { + return null; + } createLinksFromSnapshotDirToDataDirWithoutLog(new File(snapshotPath)); return loadSnapshot(); } catch (IOException | DiskSpaceInsufficientException e) { diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java index d5c73f6822..800d1447d8 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java @@ -166,7 +166,7 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner { private boolean isAvailableDataNode(TDataNodeLocation dataNodeLocation) { for (TEndPoint endPoint : queryContext.getEndPointBlackList()) { - if (endPoint.getIp().equals(dataNodeLocation.internalEndPoint.getIp())) { + if (endPoint.equals(dataNodeLocation.internalEndPoint)) { return false; } }
