This is an automated email from the ASF dual-hosted git repository. tanxinyu pushed a commit to branch jira5324 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 8b40073a5cd0ec44bc9b6886bef87d06958f48f9 Author: OneSizeFitQuorum <[email protected]> AuthorDate: Wed Jan 11 18:46:32 2023 +0800 finish Signed-off-by: OneSizeFitQuorum <[email protected]> --- .../apache/iotdb/consensus/iot/IoTConsensus.java | 1 + .../consensus/iot/IoTConsensusServerImpl.java | 48 +++++++++++------ .../consensus/iot/IoTConsensusServerMetrics.java | 2 +- .../consensus/iot/logdispatcher/LogDispatcher.java | 2 +- .../service/IoTConsensusRPCServiceProcessor.java | 2 +- .../apache/iotdb/consensus/iot/ReplicateTest.java | 32 ++++++------ .../test/java/org/apache/iotdb/db/auth/ATest.java | 60 ++++++++++++++++++++++ 7 files changed, 113 insertions(+), 34 deletions(-) diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java index 5135d384a2..5cfe53459d 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java @@ -262,6 +262,7 @@ public class IoTConsensus implements IConsensus { // step 2: notify all the other Peers to build the sync connection to newPeer logger.info("[IoTConsensus] notify current peers to build sync log..."); + impl.checkAndLockSafeDeletedSearchIndex(); impl.notifyPeersToBuildSyncLogChannel(peer); // step 3: take snapshot diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java index 1d668c281a..fae9c043b1 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java @@ -98,7 +98,7 @@ public class IoTConsensusServerImpl { private final Condition stateMachineCondition = stateMachineLock.newCondition(); private final String storageDir; private final List<Peer> configuration; - private final AtomicLong index; + private final AtomicLong searchIndex; private final LogDispatcher logDispatcher; private final IoTConsensusConfig config; private final ConsensusReqReader reader; @@ -132,11 +132,8 @@ public class IoTConsensusServerImpl { this.logDispatcher = new LogDispatcher(this, clientManager); reader = (ConsensusReqReader) stateMachine.read(new GetConsensusReqReaderPlan()); long currentSearchIndex = reader.getCurrentSearchIndex(); - if (1 == configuration.size()) { - // only one configuration means single replica. - reader.setSafelyDeletedSearchIndex(Long.MAX_VALUE); - } - this.index = new AtomicLong(currentSearchIndex); + checkAndUpdateSafeDeletedSearchIndex(); + this.searchIndex = new AtomicLong(currentSearchIndex); this.consensusGroupId = thisNode.getGroupId().toString(); this.metrics = new IoTConsensusServerMetrics(this); } @@ -180,7 +177,7 @@ public class IoTConsensusServerImpl { if (needBlockWrite()) { logger.info( "[Throttle Down] index:{}, safeIndex:{}", - getIndex(), + getSearchIndex(), getCurrentSafelyDeletedSearchIndex()); try { boolean timeout = @@ -240,9 +237,9 @@ public class IoTConsensusServerImpl { // is not expected and will slow down the preparation speed for batch. // So we need to use the lock to ensure the `offer()` and `incrementAndGet()` are // in one transaction. - synchronized (index) { + synchronized (searchIndex) { logDispatcher.offer(indexedConsensusRequest); - index.incrementAndGet(); + searchIndex.incrementAndGet(); } // statistic the time of offering request into queue MetricService.getInstance() @@ -456,7 +453,7 @@ public class IoTConsensusServerImpl { if (peer.equals(thisNode)) { // use searchIndex for thisNode as the initialSyncIndex because targetPeer will load the // snapshot produced by thisNode - buildSyncLogChannel(targetPeer, index.get()); + buildSyncLogChannel(targetPeer, searchIndex.get()); } else { // use RPC to tell other peers to build sync log channel to target peer try (SyncIoTConsensusServiceClient client = @@ -592,6 +589,7 @@ public class IoTConsensusServerImpl { logger.info("[IoTConsensus] log dispatcher to {} removed and cleanup", targetPeer); // step 2, update configuration configuration.remove(targetPeer); + checkAndUpdateSafeDeletedSearchIndex(); // step 3, persist configuration persistConfigurationUpdate(); logger.info("[IoTConsensus] configuration updated to {}", this.configuration); @@ -668,7 +666,7 @@ public class IoTConsensusServerImpl { public IndexedConsensusRequest buildIndexedConsensusRequestForLocalRequest( IConsensusRequest request) { - return new IndexedConsensusRequest(index.get() + 1, Collections.singletonList(request)); + return new IndexedConsensusRequest(searchIndex.get() + 1, Collections.singletonList(request)); } public IndexedConsensusRequest buildIndexedConsensusRequestForRemoteRequest( @@ -682,7 +680,7 @@ public class IoTConsensusServerImpl { * single copies, the current index is selected */ public long getCurrentSafelyDeletedSearchIndex() { - return logDispatcher.getMinSyncIndex().orElseGet(index::get); + return logDispatcher.getMinSyncIndex().orElseGet(searchIndex::get); } public String getStorageDir() { @@ -697,8 +695,8 @@ public class IoTConsensusServerImpl { return configuration; } - public long getIndex() { - return index.get(); + public long getSearchIndex() { + return searchIndex.get(); } public IoTConsensusConfig getConfig() { @@ -723,7 +721,7 @@ public class IoTConsensusServerImpl { } public AtomicLong getIndexObject() { - return index; + return searchIndex; } public boolean isReadOnly() { @@ -768,4 +766,24 @@ public class IoTConsensusServerImpl { } } } + + /** + * We should set safelyDeletedSearchIndex to searchIndex before addPeer to avoid potential data + * lost + */ + public void checkAndLockSafeDeletedSearchIndex() { + if (configuration.size() == 1) { + reader.setSafelyDeletedSearchIndex(searchIndex.get()); + } + } + + /** + * only one configuration means single replica, then we can set safelyDeletedSearchIndex to + * Long.MAX_VALUE + */ + public void checkAndUpdateSafeDeletedSearchIndex() { + if (configuration.size() == 1) { + reader.setSafelyDeletedSearchIndex(Long.MAX_VALUE); + } + } } diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerMetrics.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerMetrics.java index 88b25486b7..1e36e20b70 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerMetrics.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerMetrics.java @@ -41,7 +41,7 @@ public class IoTConsensusServerMetrics implements IMetricSet { Metric.IOT_CONSENSUS.toString(), MetricLevel.IMPORTANT, impl, - IoTConsensusServerImpl::getIndex, + IoTConsensusServerImpl::getSearchIndex, Tag.NAME.toString(), "ioTConsensusServerImpl", Tag.REGION.toString(), diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java index 752a392ed1..93de711b76 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java @@ -344,7 +344,7 @@ public class LogDispatcher { long startIndex = syncStatus.getNextSendingIndex(); long maxIndex; synchronized (impl.getIndexObject()) { - maxIndex = impl.getIndex() + 1; + maxIndex = impl.getSearchIndex() + 1; logger.debug( "{}: startIndex: {}, maxIndex: {}, pendingEntries size: {}, bufferedEntries size: {}", impl.getThisNode().getGroupId(), diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java index f966528cdc..6b24f596a2 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java @@ -236,7 +236,7 @@ public class IoTConsensusRPCServiceProcessor implements IoTConsensusIService.Asy resultHandler.onComplete(new TWaitSyncLogCompleteRes(true, 0, 0)); return; } - long searchIndex = impl.getIndex(); + long searchIndex = impl.getSearchIndex(); long safeIndex = impl.getCurrentSafelyDeletedSearchIndex(); resultHandler.onComplete( new TWaitSyncLogCompleteRes(searchIndex == safeIndex, searchIndex, safeIndex)); diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java index e1675e7994..a36db6e3ea 100644 --- a/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java +++ b/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java @@ -120,17 +120,17 @@ public class ReplicateTest { servers.get(1).createPeer(group.getGroupId(), group.getPeers()); servers.get(2).createPeer(group.getGroupId(), group.getPeers()); - Assert.assertEquals(0, servers.get(0).getImpl(gid).getIndex()); - Assert.assertEquals(0, servers.get(1).getImpl(gid).getIndex()); - Assert.assertEquals(0, servers.get(2).getImpl(gid).getIndex()); + Assert.assertEquals(0, servers.get(0).getImpl(gid).getSearchIndex()); + Assert.assertEquals(0, servers.get(1).getImpl(gid).getSearchIndex()); + Assert.assertEquals(0, servers.get(2).getImpl(gid).getSearchIndex()); for (int i = 0; i < CHECK_POINT_GAP; i++) { servers.get(0).write(gid, new TestEntry(i, peers.get(0))); servers.get(1).write(gid, new TestEntry(i, peers.get(1))); servers.get(2).write(gid, new TestEntry(i, peers.get(2))); - Assert.assertEquals(i + 1, servers.get(0).getImpl(gid).getIndex()); - Assert.assertEquals(i + 1, servers.get(1).getImpl(gid).getIndex()); - Assert.assertEquals(i + 1, servers.get(2).getImpl(gid).getIndex()); + Assert.assertEquals(i + 1, servers.get(0).getImpl(gid).getSearchIndex()); + Assert.assertEquals(i + 1, servers.get(1).getImpl(gid).getSearchIndex()); + Assert.assertEquals(i + 1, servers.get(2).getImpl(gid).getSearchIndex()); } for (int i = 0; i < 3; i++) { @@ -163,9 +163,9 @@ public class ReplicateTest { Assert.assertEquals(peers, servers.get(1).getImpl(gid).getConfiguration()); Assert.assertEquals(peers, servers.get(2).getImpl(gid).getConfiguration()); - Assert.assertEquals(CHECK_POINT_GAP, servers.get(0).getImpl(gid).getIndex()); - Assert.assertEquals(CHECK_POINT_GAP, servers.get(1).getImpl(gid).getIndex()); - Assert.assertEquals(CHECK_POINT_GAP, servers.get(2).getImpl(gid).getIndex()); + Assert.assertEquals(CHECK_POINT_GAP, servers.get(0).getImpl(gid).getSearchIndex()); + Assert.assertEquals(CHECK_POINT_GAP, servers.get(1).getImpl(gid).getSearchIndex()); + Assert.assertEquals(CHECK_POINT_GAP, servers.get(2).getImpl(gid).getSearchIndex()); for (int i = 0; i < 3; i++) { long start = System.currentTimeMillis(); @@ -197,14 +197,14 @@ public class ReplicateTest { servers.get(0).createPeer(group.getGroupId(), group.getPeers()); servers.get(1).createPeer(group.getGroupId(), group.getPeers()); - Assert.assertEquals(0, servers.get(0).getImpl(gid).getIndex()); - Assert.assertEquals(0, servers.get(1).getImpl(gid).getIndex()); + Assert.assertEquals(0, servers.get(0).getImpl(gid).getSearchIndex()); + Assert.assertEquals(0, servers.get(1).getImpl(gid).getSearchIndex()); for (int i = 0; i < CHECK_POINT_GAP; i++) { servers.get(0).write(gid, new TestEntry(i, peers.get(0))); servers.get(1).write(gid, new TestEntry(i, peers.get(1))); - Assert.assertEquals(i + 1, servers.get(0).getImpl(gid).getIndex()); - Assert.assertEquals(i + 1, servers.get(1).getImpl(gid).getIndex()); + Assert.assertEquals(i + 1, servers.get(0).getImpl(gid).getSearchIndex()); + Assert.assertEquals(i + 1, servers.get(1).getImpl(gid).getSearchIndex()); } Assert.assertEquals(0, servers.get(0).getImpl(gid).getCurrentSafelyDeletedSearchIndex()); @@ -219,9 +219,9 @@ public class ReplicateTest { Assert.assertEquals(peers, servers.get(1).getImpl(gid).getConfiguration()); Assert.assertEquals(peers, servers.get(2).getImpl(gid).getConfiguration()); - Assert.assertEquals(CHECK_POINT_GAP, servers.get(0).getImpl(gid).getIndex()); - Assert.assertEquals(CHECK_POINT_GAP, servers.get(1).getImpl(gid).getIndex()); - Assert.assertEquals(0, servers.get(2).getImpl(gid).getIndex()); + Assert.assertEquals(CHECK_POINT_GAP, servers.get(0).getImpl(gid).getSearchIndex()); + Assert.assertEquals(CHECK_POINT_GAP, servers.get(1).getImpl(gid).getSearchIndex()); + Assert.assertEquals(0, servers.get(2).getImpl(gid).getSearchIndex()); for (int i = 0; i < 2; i++) { long start = System.currentTimeMillis(); diff --git a/server/src/test/java/org/apache/iotdb/db/auth/ATest.java b/server/src/test/java/org/apache/iotdb/db/auth/ATest.java new file mode 100644 index 0000000000..1994a53176 --- /dev/null +++ b/server/src/test/java/org/apache/iotdb/db/auth/ATest.java @@ -0,0 +1,60 @@ +package org.apache.iotdb.db.auth; + +import org.junit.Test; +import org.testcontainers.shaded.com.google.common.base.Stopwatch; + +import java.time.Instant; +import java.util.Calendar; +import java.util.Date; + +public class ATest { + @Test + public void testSystemCurrentTime() { + final Stopwatch stopwatch = Stopwatch.createStarted(); + for (int i = 0; i < 1_0000_000; i++) { + System.currentTimeMillis(); + } + stopwatch.stop(); + System.out.println("System.currentTimeMillis(): " + stopwatch); + } + + @Test + public void testDateTime() { + final Stopwatch stopwatch = Stopwatch.createStarted(); + for (int i = 0; i < 1_0000_000; i++) { + (new Date()).getTime(); + } + stopwatch.stop(); + System.out.println("(new Date()).getTime(): " + stopwatch); + } + + @Test + public void testCalendarTime() { + final Stopwatch stopwatch = Stopwatch.createStarted(); + for (int i = 0; i < 1_0000_000; i++) { + Calendar.getInstance().getTimeInMillis(); + } + stopwatch.stop(); + System.out.println("Calendar.getInstance().getTimeInMillis(): " + stopwatch); + } + + @Test + public void testInstantNow() { + final Stopwatch stopwatch = Stopwatch.createStarted(); + for (int i = 0; i < 1_0000_000; i++) { + Instant.now(); + } + stopwatch.stop(); + System.out.println("Instant.now(): " + stopwatch); + } + + @Test + public void testNanoTimeNow() { + final Stopwatch stopwatch = Stopwatch.createStarted(); + for (int i = 0; i < 1_0000_000; i++) { + System.nanoTime(); + } + stopwatch.stop(); + System.out.println("nanoTime(): " + stopwatch); + } +}
