This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch jira5324
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 8b40073a5cd0ec44bc9b6886bef87d06958f48f9
Author: OneSizeFitQuorum <[email protected]>
AuthorDate: Wed Jan 11 18:46:32 2023 +0800

    finish
    
    Signed-off-by: OneSizeFitQuorum <[email protected]>
---
 .../apache/iotdb/consensus/iot/IoTConsensus.java   |  1 +
 .../consensus/iot/IoTConsensusServerImpl.java      | 48 +++++++++++------
 .../consensus/iot/IoTConsensusServerMetrics.java   |  2 +-
 .../consensus/iot/logdispatcher/LogDispatcher.java |  2 +-
 .../service/IoTConsensusRPCServiceProcessor.java   |  2 +-
 .../apache/iotdb/consensus/iot/ReplicateTest.java  | 32 ++++++------
 .../test/java/org/apache/iotdb/db/auth/ATest.java  | 60 ++++++++++++++++++++++
 7 files changed, 113 insertions(+), 34 deletions(-)

diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java 
b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
index 5135d384a2..5cfe53459d 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
@@ -262,6 +262,7 @@ public class IoTConsensus implements IConsensus {
 
       // step 2: notify all the other Peers to build the sync connection to 
newPeer
       logger.info("[IoTConsensus] notify current peers to build sync log...");
+      impl.checkAndLockSafeDeletedSearchIndex();
       impl.notifyPeersToBuildSyncLogChannel(peer);
 
       // step 3: take snapshot
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index 1d668c281a..fae9c043b1 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -98,7 +98,7 @@ public class IoTConsensusServerImpl {
   private final Condition stateMachineCondition = 
stateMachineLock.newCondition();
   private final String storageDir;
   private final List<Peer> configuration;
-  private final AtomicLong index;
+  private final AtomicLong searchIndex;
   private final LogDispatcher logDispatcher;
   private final IoTConsensusConfig config;
   private final ConsensusReqReader reader;
@@ -132,11 +132,8 @@ public class IoTConsensusServerImpl {
     this.logDispatcher = new LogDispatcher(this, clientManager);
     reader = (ConsensusReqReader) stateMachine.read(new 
GetConsensusReqReaderPlan());
     long currentSearchIndex = reader.getCurrentSearchIndex();
-    if (1 == configuration.size()) {
-      // only one configuration means single replica.
-      reader.setSafelyDeletedSearchIndex(Long.MAX_VALUE);
-    }
-    this.index = new AtomicLong(currentSearchIndex);
+    checkAndUpdateSafeDeletedSearchIndex();
+    this.searchIndex = new AtomicLong(currentSearchIndex);
     this.consensusGroupId = thisNode.getGroupId().toString();
     this.metrics = new IoTConsensusServerMetrics(this);
   }
@@ -180,7 +177,7 @@ public class IoTConsensusServerImpl {
       if (needBlockWrite()) {
         logger.info(
             "[Throttle Down] index:{}, safeIndex:{}",
-            getIndex(),
+            getSearchIndex(),
             getCurrentSafelyDeletedSearchIndex());
         try {
           boolean timeout =
@@ -240,9 +237,9 @@ public class IoTConsensusServerImpl {
         // is not expected and will slow down the preparation speed for batch.
         // So we need to use the lock to ensure the `offer()` and 
`incrementAndGet()` are
         // in one transaction.
-        synchronized (index) {
+        synchronized (searchIndex) {
           logDispatcher.offer(indexedConsensusRequest);
-          index.incrementAndGet();
+          searchIndex.incrementAndGet();
         }
         // statistic the time of offering request into queue
         MetricService.getInstance()
@@ -456,7 +453,7 @@ public class IoTConsensusServerImpl {
       if (peer.equals(thisNode)) {
         // use searchIndex for thisNode as the initialSyncIndex because 
targetPeer will load the
         // snapshot produced by thisNode
-        buildSyncLogChannel(targetPeer, index.get());
+        buildSyncLogChannel(targetPeer, searchIndex.get());
       } else {
         // use RPC to tell other peers to build sync log channel to target peer
         try (SyncIoTConsensusServiceClient client =
@@ -592,6 +589,7 @@ public class IoTConsensusServerImpl {
       logger.info("[IoTConsensus] log dispatcher to {} removed and cleanup", 
targetPeer);
       // step 2, update configuration
       configuration.remove(targetPeer);
+      checkAndUpdateSafeDeletedSearchIndex();
       // step 3, persist configuration
       persistConfigurationUpdate();
       logger.info("[IoTConsensus] configuration updated to {}", 
this.configuration);
@@ -668,7 +666,7 @@ public class IoTConsensusServerImpl {
 
   public IndexedConsensusRequest buildIndexedConsensusRequestForLocalRequest(
       IConsensusRequest request) {
-    return new IndexedConsensusRequest(index.get() + 1, 
Collections.singletonList(request));
+    return new IndexedConsensusRequest(searchIndex.get() + 1, 
Collections.singletonList(request));
   }
 
   public IndexedConsensusRequest buildIndexedConsensusRequestForRemoteRequest(
@@ -682,7 +680,7 @@ public class IoTConsensusServerImpl {
    * single copies, the current index is selected
    */
   public long getCurrentSafelyDeletedSearchIndex() {
-    return logDispatcher.getMinSyncIndex().orElseGet(index::get);
+    return logDispatcher.getMinSyncIndex().orElseGet(searchIndex::get);
   }
 
   public String getStorageDir() {
@@ -697,8 +695,8 @@ public class IoTConsensusServerImpl {
     return configuration;
   }
 
-  public long getIndex() {
-    return index.get();
+  public long getSearchIndex() {
+    return searchIndex.get();
   }
 
   public IoTConsensusConfig getConfig() {
@@ -723,7 +721,7 @@ public class IoTConsensusServerImpl {
   }
 
   public AtomicLong getIndexObject() {
-    return index;
+    return searchIndex;
   }
 
   public boolean isReadOnly() {
@@ -768,4 +766,24 @@ public class IoTConsensusServerImpl {
       }
     }
   }
+
+  /**
+   * We should set safelyDeletedSearchIndex to searchIndex before addPeer to 
avoid potential data
+   * lost
+   */
+  public void checkAndLockSafeDeletedSearchIndex() {
+    if (configuration.size() == 1) {
+      reader.setSafelyDeletedSearchIndex(searchIndex.get());
+    }
+  }
+
+  /**
+   * only one configuration means single replica, then we can set 
safelyDeletedSearchIndex to
+   * Long.MAX_VALUE
+   */
+  public void checkAndUpdateSafeDeletedSearchIndex() {
+    if (configuration.size() == 1) {
+      reader.setSafelyDeletedSearchIndex(Long.MAX_VALUE);
+    }
+  }
 }
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerMetrics.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerMetrics.java
index 88b25486b7..1e36e20b70 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerMetrics.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerMetrics.java
@@ -41,7 +41,7 @@ public class IoTConsensusServerMetrics implements IMetricSet {
             Metric.IOT_CONSENSUS.toString(),
             MetricLevel.IMPORTANT,
             impl,
-            IoTConsensusServerImpl::getIndex,
+            IoTConsensusServerImpl::getSearchIndex,
             Tag.NAME.toString(),
             "ioTConsensusServerImpl",
             Tag.REGION.toString(),
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
index 752a392ed1..93de711b76 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
@@ -344,7 +344,7 @@ public class LogDispatcher {
       long startIndex = syncStatus.getNextSendingIndex();
       long maxIndex;
       synchronized (impl.getIndexObject()) {
-        maxIndex = impl.getIndex() + 1;
+        maxIndex = impl.getSearchIndex() + 1;
         logger.debug(
             "{}: startIndex: {}, maxIndex: {}, pendingEntries size: {}, 
bufferedEntries size: {}",
             impl.getThisNode().getGroupId(),
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
index f966528cdc..6b24f596a2 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
@@ -236,7 +236,7 @@ public class IoTConsensusRPCServiceProcessor implements 
IoTConsensusIService.Asy
       resultHandler.onComplete(new TWaitSyncLogCompleteRes(true, 0, 0));
       return;
     }
-    long searchIndex = impl.getIndex();
+    long searchIndex = impl.getSearchIndex();
     long safeIndex = impl.getCurrentSafelyDeletedSearchIndex();
     resultHandler.onComplete(
         new TWaitSyncLogCompleteRes(searchIndex == safeIndex, searchIndex, 
safeIndex));
diff --git 
a/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java 
b/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java
index e1675e7994..a36db6e3ea 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java
@@ -120,17 +120,17 @@ public class ReplicateTest {
     servers.get(1).createPeer(group.getGroupId(), group.getPeers());
     servers.get(2).createPeer(group.getGroupId(), group.getPeers());
 
-    Assert.assertEquals(0, servers.get(0).getImpl(gid).getIndex());
-    Assert.assertEquals(0, servers.get(1).getImpl(gid).getIndex());
-    Assert.assertEquals(0, servers.get(2).getImpl(gid).getIndex());
+    Assert.assertEquals(0, servers.get(0).getImpl(gid).getSearchIndex());
+    Assert.assertEquals(0, servers.get(1).getImpl(gid).getSearchIndex());
+    Assert.assertEquals(0, servers.get(2).getImpl(gid).getSearchIndex());
 
     for (int i = 0; i < CHECK_POINT_GAP; i++) {
       servers.get(0).write(gid, new TestEntry(i, peers.get(0)));
       servers.get(1).write(gid, new TestEntry(i, peers.get(1)));
       servers.get(2).write(gid, new TestEntry(i, peers.get(2)));
-      Assert.assertEquals(i + 1, servers.get(0).getImpl(gid).getIndex());
-      Assert.assertEquals(i + 1, servers.get(1).getImpl(gid).getIndex());
-      Assert.assertEquals(i + 1, servers.get(2).getImpl(gid).getIndex());
+      Assert.assertEquals(i + 1, servers.get(0).getImpl(gid).getSearchIndex());
+      Assert.assertEquals(i + 1, servers.get(1).getImpl(gid).getSearchIndex());
+      Assert.assertEquals(i + 1, servers.get(2).getImpl(gid).getSearchIndex());
     }
 
     for (int i = 0; i < 3; i++) {
@@ -163,9 +163,9 @@ public class ReplicateTest {
     Assert.assertEquals(peers, servers.get(1).getImpl(gid).getConfiguration());
     Assert.assertEquals(peers, servers.get(2).getImpl(gid).getConfiguration());
 
-    Assert.assertEquals(CHECK_POINT_GAP, 
servers.get(0).getImpl(gid).getIndex());
-    Assert.assertEquals(CHECK_POINT_GAP, 
servers.get(1).getImpl(gid).getIndex());
-    Assert.assertEquals(CHECK_POINT_GAP, 
servers.get(2).getImpl(gid).getIndex());
+    Assert.assertEquals(CHECK_POINT_GAP, 
servers.get(0).getImpl(gid).getSearchIndex());
+    Assert.assertEquals(CHECK_POINT_GAP, 
servers.get(1).getImpl(gid).getSearchIndex());
+    Assert.assertEquals(CHECK_POINT_GAP, 
servers.get(2).getImpl(gid).getSearchIndex());
 
     for (int i = 0; i < 3; i++) {
       long start = System.currentTimeMillis();
@@ -197,14 +197,14 @@ public class ReplicateTest {
     servers.get(0).createPeer(group.getGroupId(), group.getPeers());
     servers.get(1).createPeer(group.getGroupId(), group.getPeers());
 
-    Assert.assertEquals(0, servers.get(0).getImpl(gid).getIndex());
-    Assert.assertEquals(0, servers.get(1).getImpl(gid).getIndex());
+    Assert.assertEquals(0, servers.get(0).getImpl(gid).getSearchIndex());
+    Assert.assertEquals(0, servers.get(1).getImpl(gid).getSearchIndex());
 
     for (int i = 0; i < CHECK_POINT_GAP; i++) {
       servers.get(0).write(gid, new TestEntry(i, peers.get(0)));
       servers.get(1).write(gid, new TestEntry(i, peers.get(1)));
-      Assert.assertEquals(i + 1, servers.get(0).getImpl(gid).getIndex());
-      Assert.assertEquals(i + 1, servers.get(1).getImpl(gid).getIndex());
+      Assert.assertEquals(i + 1, servers.get(0).getImpl(gid).getSearchIndex());
+      Assert.assertEquals(i + 1, servers.get(1).getImpl(gid).getSearchIndex());
     }
 
     Assert.assertEquals(0, 
servers.get(0).getImpl(gid).getCurrentSafelyDeletedSearchIndex());
@@ -219,9 +219,9 @@ public class ReplicateTest {
     Assert.assertEquals(peers, servers.get(1).getImpl(gid).getConfiguration());
     Assert.assertEquals(peers, servers.get(2).getImpl(gid).getConfiguration());
 
-    Assert.assertEquals(CHECK_POINT_GAP, 
servers.get(0).getImpl(gid).getIndex());
-    Assert.assertEquals(CHECK_POINT_GAP, 
servers.get(1).getImpl(gid).getIndex());
-    Assert.assertEquals(0, servers.get(2).getImpl(gid).getIndex());
+    Assert.assertEquals(CHECK_POINT_GAP, 
servers.get(0).getImpl(gid).getSearchIndex());
+    Assert.assertEquals(CHECK_POINT_GAP, 
servers.get(1).getImpl(gid).getSearchIndex());
+    Assert.assertEquals(0, servers.get(2).getImpl(gid).getSearchIndex());
 
     for (int i = 0; i < 2; i++) {
       long start = System.currentTimeMillis();
diff --git a/server/src/test/java/org/apache/iotdb/db/auth/ATest.java 
b/server/src/test/java/org/apache/iotdb/db/auth/ATest.java
new file mode 100644
index 0000000000..1994a53176
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/auth/ATest.java
@@ -0,0 +1,60 @@
+package org.apache.iotdb.db.auth;
+
+import org.junit.Test;
+import org.testcontainers.shaded.com.google.common.base.Stopwatch;
+
+import java.time.Instant;
+import java.util.Calendar;
+import java.util.Date;
+
+public class ATest {
+  @Test
+  public void testSystemCurrentTime() {
+    final Stopwatch stopwatch = Stopwatch.createStarted();
+    for (int i = 0; i < 1_0000_000; i++) {
+      System.currentTimeMillis();
+    }
+    stopwatch.stop();
+    System.out.println("System.currentTimeMillis(): " + stopwatch);
+  }
+
+  @Test
+  public void testDateTime() {
+    final Stopwatch stopwatch = Stopwatch.createStarted();
+    for (int i = 0; i < 1_0000_000; i++) {
+      (new Date()).getTime();
+    }
+    stopwatch.stop();
+    System.out.println("(new Date()).getTime(): " + stopwatch);
+  }
+
+  @Test
+  public void testCalendarTime() {
+    final Stopwatch stopwatch = Stopwatch.createStarted();
+    for (int i = 0; i < 1_0000_000; i++) {
+      Calendar.getInstance().getTimeInMillis();
+    }
+    stopwatch.stop();
+    System.out.println("Calendar.getInstance().getTimeInMillis(): " + 
stopwatch);
+  }
+
+  @Test
+  public void testInstantNow() {
+    final Stopwatch stopwatch = Stopwatch.createStarted();
+    for (int i = 0; i < 1_0000_000; i++) {
+      Instant.now();
+    }
+    stopwatch.stop();
+    System.out.println("Instant.now(): " + stopwatch);
+  }
+
+  @Test
+  public void testNanoTimeNow() {
+    final Stopwatch stopwatch = Stopwatch.createStarted();
+    for (int i = 0; i < 1_0000_000; i++) {
+      System.nanoTime();
+    }
+    stopwatch.stop();
+    System.out.println("nanoTime(): " + stopwatch);
+  }
+}

Reply via email to