This is an automated email from the ASF dual-hosted git repository.

williamsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e136f391 RATIS-1893. In SegmentedRaftLogCache, start a daemon thread 
to checkAndEvictCache. (#924)
0e136f391 is described below

commit 0e136f39123dc65a07a41c7146ea0e91f0fe1fa7
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Sat Sep 23 19:23:16 2023 -0700

    RATIS-1893. In SegmentedRaftLogCache, start a daemon thread to 
checkAndEvictCache. (#924)
---
 .../java/org/apache/ratis/util/AwaitForSignal.java |   3 +
 .../java/org/apache/ratis/util/AwaitToRun.java     | 113 +++++++++++++++++++++
 .../server/raftlog/segmented/SegmentedRaftLog.java |  34 ++++---
 3 files changed, 137 insertions(+), 13 deletions(-)

diff --git 
a/ratis-common/src/main/java/org/apache/ratis/util/AwaitForSignal.java 
b/ratis-common/src/main/java/org/apache/ratis/util/AwaitForSignal.java
index 88808c52b..5ae0bbc7f 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/AwaitForSignal.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/AwaitForSignal.java
@@ -59,6 +59,9 @@ public class AwaitForSignal {
     }
     lock.lock();
     try {
+      if (signaled.get().get()) {
+        return true;
+      }
       return condition.await(time, unit);
     } finally {
       lock.unlock();
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/AwaitToRun.java 
b/ratis-common/src/main/java/org/apache/ratis/util/AwaitToRun.java
new file mode 100644
index 000000000..7eedef915
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/AwaitToRun.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Use a {@link Daemon},
+ * which repeatedly waits for a signal to run a method.
+ * <p>
+ * This class is threadsafe.
+ *
+ * @see AwaitForSignal
+ */
+public class AwaitToRun implements AutoCloseable {
+  public static final Logger LOG = LoggerFactory.getLogger(AwaitToRun.class);
+
+  private final class RunnableImpl implements Runnable {
+    private final Runnable runMethod;
+
+    private RunnableImpl(Runnable runMethod) {
+      this.runMethod = runMethod;
+    }
+
+    @Override
+    public void run() {
+      for (; ; ) {
+        try {
+          awaitForSignal.await();
+        } catch (InterruptedException e) {
+          LOG.info("{} is interrupted", awaitForSignal);
+          Thread.currentThread().interrupt();
+          return;
+        }
+
+        try {
+          runMethod.run();
+        } catch (Throwable t) {
+          LOG.error(name + ": runMethod failed", t);
+        }
+      }
+    }
+  }
+
+  private final String name;
+  private final AwaitForSignal awaitForSignal;
+  private final AtomicReference<Daemon> daemon;
+
+  public AwaitToRun(Object namePrefix, Runnable runMethod) {
+    this.name = namePrefix + "-" + JavaUtils.getClassSimpleName(getClass());
+    this.awaitForSignal = new AwaitForSignal(name);
+    this.daemon = new AtomicReference<>(Daemon.newBuilder()
+        .setName(name)
+        .setRunnable(new RunnableImpl(runMethod))
+        .build());
+  }
+
+  /** Similar to {@link Thread#start()}. */
+  public AwaitToRun start() {
+    final Daemon d = daemon.get();
+    if (d != null) {
+      d.start();
+      LOG.info("{} started", d);
+    } else {
+      LOG.warn("{} is already closed", name);
+    }
+    return this;
+  }
+
+  /** Signal to run. */
+  public void signal() {
+    awaitForSignal.signal();
+  }
+
+  @Override
+  public void close() {
+    final Daemon d = daemon.getAndSet(null);
+    if (d == null) {
+      return;
+    }
+
+    d.interrupt();
+    try {
+      d.join();
+    } catch (InterruptedException e) {
+      LOG.warn(d + ": join is interrupted", e);
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  @Override
+  public String toString() {
+    return name;
+  }
+}
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
index 985652437..255bec291 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
@@ -36,6 +36,7 @@ import org.apache.ratis.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.ratis.util.AutoCloseableLock;
+import org.apache.ratis.util.AwaitToRun;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.StringUtils;
@@ -183,6 +184,7 @@ public class SegmentedRaftLog extends RaftLogBase {
   private final RaftStorage storage;
   private final StateMachine stateMachine;
   private final SegmentedRaftLogCache cache;
+  private final AwaitToRun cacheEviction;
   private final SegmentedRaftLogWorker fileLogWorker;
   private final long segmentMaxSize;
   private final boolean stateMachineCachingEnabled;
@@ -200,6 +202,7 @@ public class SegmentedRaftLog extends RaftLogBase {
     this.stateMachine = stateMachine;
     segmentMaxSize = 
RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize();
     this.cache = new SegmentedRaftLogCache(memberId, storage, properties, 
getRaftLogMetrics());
+    this.cacheEviction = new AwaitToRun(memberId + "-cacheEviction", 
this::checkAndEvictCache).start();
     this.fileLogWorker = new SegmentedRaftLogWorker(memberId, stateMachine,
         submitUpdateCommitEvent, server, storage, properties, 
getRaftLogMetrics());
     stateMachineCachingEnabled = 
RaftServerConfigKeys.Log.StateMachineData.cachingEnabled(properties);
@@ -278,7 +281,7 @@ public class SegmentedRaftLog extends RaftLogBase {
 
     // the entry is not in the segment's cache. Load the cache without holding 
the lock.
     getRaftLogMetrics().onRaftLogCacheMiss();
-    checkAndEvictCache();
+    cacheEviction.signal();
     return segment.loadCache(record);
   }
 
@@ -382,26 +385,29 @@ public class SegmentedRaftLog extends RaftLogBase {
       final Timekeeper.Context appendEntryTimerContext = 
getRaftLogMetrics().startAppendEntryTimer();
       validateLogEntry(entry);
       final LogSegment currentOpenSegment = cache.getOpenSegment();
+      boolean rollOpenSegment = false;
       if (currentOpenSegment == null) {
         cache.addOpenSegment(entry.getIndex());
         fileLogWorker.startLogSegment(entry.getIndex());
       } else if (isSegmentFull(currentOpenSegment, entry)) {
+        rollOpenSegment = true;
+      } else {
+        final TermIndex last = currentOpenSegment.getLastTermIndex();
+        if (last != null && last.getTerm() != entry.getTerm()) {
+          // the term changes
+          Preconditions.assertTrue(last.getTerm() < entry.getTerm(),
+              "open segment's term %s is larger than the new entry's term %s",
+              last.getTerm(), entry.getTerm());
+          rollOpenSegment = true;
+        }
+      }
+
+      if (rollOpenSegment) {
         cache.rollOpenSegment(true);
         fileLogWorker.rollLogSegment(currentOpenSegment);
-      } else if (currentOpenSegment.numOfEntries() > 0 &&
-          currentOpenSegment.getLastTermIndex().getTerm() != entry.getTerm()) {
-        // the term changes
-        final long currentTerm = 
currentOpenSegment.getLastTermIndex().getTerm();
-        Preconditions.assertTrue(currentTerm < entry.getTerm(),
-            "open segment's term %s is larger than the new entry's term %s",
-            currentTerm, entry.getTerm());
-        cache.rollOpenSegment(true);
-        fileLogWorker.rollLogSegment(currentOpenSegment);
+        cacheEviction.signal();
       }
 
-      //TODO(runzhiwang): If there is performance problem, start a daemon 
thread to checkAndEvictCache
-      checkAndEvictCache();
-
       // If the entry has state machine data, then the entry should be inserted
       // to statemachine first and then to the cache. Not following the order
       // will leave a spurious entry in the cache.
@@ -496,6 +502,7 @@ public class SegmentedRaftLog extends RaftLogBase {
       if (openSegment.getEndIndex() <= lastSnapshotIndex) {
         fileLogWorker.closeLogSegment(openSegment);
         cache.rollOpenSegment(false);
+        cacheEviction.signal();
       }
     }
     return purgeImpl(lastSnapshotIndex);
@@ -505,6 +512,7 @@ public class SegmentedRaftLog extends RaftLogBase {
   public void close() throws IOException {
     try(AutoCloseableLock writeLock = writeLock()) {
       super.close();
+      cacheEviction.close();
       cache.close();
     }
     fileLogWorker.close();

Reply via email to