This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 96e7384  RATIS-1500. Fix LogAppender NN_NAKED_NOTIFY. (#588)
96e7384 is described below

commit 96e73841f2004edc359893148df4aa027af5d01d
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Tue Jan 25 04:03:41 2022 +0800

    RATIS-1500. Fix LogAppender NN_NAKED_NOTIFY. (#588)
---
 .../java/org/apache/ratis/util/AwaitForSignal.java | 74 ++++++++++++++++++++++
 .../apache/ratis/grpc/server/GrpcLogAppender.java  | 43 +++++--------
 .../apache/ratis/server/leader/LogAppender.java    | 12 ++--
 .../apache/ratis/server/impl/LeaderStateImpl.java  |  2 +-
 .../ratis/server/leader/LogAppenderBase.java       |  8 +++
 .../ratis/server/leader/LogAppenderDefault.java    |  8 +--
 6 files changed, 107 insertions(+), 40 deletions(-)

diff --git 
a/ratis-common/src/main/java/org/apache/ratis/util/AwaitForSignal.java 
b/ratis-common/src/main/java/org/apache/ratis/util/AwaitForSignal.java
new file mode 100644
index 0000000..5294818
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/AwaitForSignal.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * This class is a partial implementation of {@link 
java.util.concurrent.locks.Condition}.
+ * Only some of the await and signal methods are implemented.
+ *
+ * This class is threadsafe.
+ */
+public class AwaitForSignal {
+  private final String name;
+
+  private final AtomicReference<CompletableFuture<Void>> future = new 
AtomicReference<>(new CompletableFuture<>());
+
+  public AwaitForSignal(Object name) {
+    this.name = name + "-" + JavaUtils.getClassSimpleName(getClass());
+  }
+
+  /** The same as {@link java.util.concurrent.locks.Condition#await()} */
+  public void await() throws InterruptedException {
+    try {
+      future.get().get();
+    } catch (ExecutionException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+
+  /** The same as {@link java.util.concurrent.locks.Condition#await(long, 
TimeUnit)} */
+  public boolean await(long time, TimeUnit unit) throws InterruptedException {
+    if (time <= 0) {
+      return false;
+    }
+    try {
+      future.get().get(time, unit);
+    } catch (ExecutionException e) {
+      throw new IllegalStateException(e);
+    } catch (TimeoutException ignored) {
+      return false;
+    }
+    return true;
+  }
+
+  /** The same as {@link java.util.concurrent.locks.Condition#signal()} */
+  public void signal() {
+    future.getAndSet(new CompletableFuture<>()).complete(null);
+  }
+
+  @Override
+  public String toString() {
+    return name;
+  }
+}
\ No newline at end of file
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 990d06f..bb996c2 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -43,6 +43,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.codahale.metrics.Timer;
@@ -164,19 +165,11 @@ public class GrpcLogAppender extends LogAppenderBase {
 
   private void mayWait() {
     // use lastSend time instead of lastResponse time
-    final long waitTimeMs = getWaitTimeMs();
-    if (waitTimeMs <= 0L) {
-      return;
-    }
-
-    synchronized(this) {
-      try {
-        LOG.trace("{}: wait {}ms", this, waitTimeMs);
-        wait(waitTimeMs);
-      } catch (InterruptedException ie) {
-        LOG.warn(this + ": Wait interrupted by " + ie);
-        Thread.currentThread().interrupt();
-      }
+    try {
+      getEventAwaitForSignal().await(getWaitTimeMs(), TimeUnit.MILLISECONDS);
+    } catch (InterruptedException ie) {
+      LOG.warn(this + ": Wait interrupted by " + ie);
+      Thread.currentThread().interrupt();
     }
   }
 
@@ -526,13 +519,11 @@ public class GrpcLogAppender extends LogAppenderBase {
       return;
     }
 
-    synchronized (this) {
-      while (isRunning() && !responseHandler.isDone()) {
-        try {
-          wait();
-        } catch (InterruptedException ignored) {
-          Thread.currentThread().interrupt();
-        }
+    while (isRunning() && !responseHandler.isDone()) {
+      try {
+        getEventAwaitForSignal().await();
+      } catch (InterruptedException ignored) {
+        Thread.currentThread().interrupt();
       }
     }
 
@@ -571,13 +562,11 @@ public class GrpcLogAppender extends LogAppenderBase {
       return;
     }
 
-    synchronized (this) {
-      while (isRunning() && !responseHandler.isDone()) {
-        try {
-          wait();
-        } catch (InterruptedException ignored) {
-          Thread.currentThread().interrupt();
-        }
+    while (isRunning() && !responseHandler.isDone()) {
+      try {
+        getEventAwaitForSignal().await();
+      } catch (InterruptedException ignored) {
+        Thread.currentThread().interrupt();
       }
     }
   }
diff --git 
a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
 
b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
index 4ec13eb..135b431 100644
--- 
a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
+++ 
b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
@@ -17,7 +17,6 @@
  */
 package org.apache.ratis.server.leader;
 
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
 import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
 import org.apache.ratis.protocol.RaftPeerId;
@@ -27,6 +26,7 @@ import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.raftlog.RaftLog;
 import org.apache.ratis.server.raftlog.RaftLogIOException;
 import org.apache.ratis.statemachine.SnapshotInfo;
+import org.apache.ratis.util.AwaitForSignal;
 import org.apache.ratis.util.ReflectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -137,16 +137,16 @@ public interface LogAppender {
   void run() throws InterruptedException, IOException;
 
   /**
-   * Similar to {@link #notify()}, wake up this {@link LogAppender} for an 
event, which can be:
+   * Get the {@link AwaitForSignal} for events, which can be:
    * (1) new log entries available,
    * (2) log indices changed, or
    * (3) a snapshot installation completed.
    */
-  @SuppressFBWarnings("NN_NAKED_NOTIFY")
+  AwaitForSignal getEventAwaitForSignal();
+
+  /** The same as getEventAwaitForSignal().signal(). */
   default void notifyLogAppender() {
-    synchronized (this) {
-      notify();
-    }
+    getEventAwaitForSignal().signal();
   }
 
   /** Should the leader send appendEntries RPC to the follower? */
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index 3181625..97da41b 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -201,7 +201,7 @@ class LeaderStateImpl implements LeaderState {
     }
 
     void forEach(Consumer<LogAppender> action) {
-      senders.parallelStream().forEach(action);
+      senders.forEach(action);
     }
 
     void addAll(Collection<LogAppender> newSenders) {
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
index c2ca24b..190f45b 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
@@ -28,6 +28,7 @@ import org.apache.ratis.server.raftlog.RaftLog;
 import org.apache.ratis.server.raftlog.RaftLog.EntryWithData;
 import org.apache.ratis.server.raftlog.RaftLogIOException;
 import org.apache.ratis.statemachine.SnapshotInfo;
+import org.apache.ratis.util.AwaitForSignal;
 import org.apache.ratis.util.DataQueue;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.Preconditions;
@@ -50,6 +51,7 @@ public abstract class LogAppenderBase implements LogAppender {
   private final int snapshotChunkMaxSize;
 
   private final LogAppenderDaemon daemon;
+  private final AwaitForSignal eventAwaitForSignal;
 
   protected LogAppenderBase(RaftServer.Division server, LeaderState 
leaderState, FollowerInfo f) {
     this.follower = f;
@@ -64,6 +66,12 @@ public abstract class LogAppenderBase implements LogAppender 
{
     final int bufferElementLimit = 
RaftServerConfigKeys.Log.Appender.bufferElementLimit(properties);
     this.buffer = new DataQueue<>(this, bufferByteLimit, bufferElementLimit, 
EntryWithData::getSerializedSize);
     this.daemon = new LogAppenderDaemon(this);
+    this.eventAwaitForSignal = new AwaitForSignal(name);
+  }
+
+  @Override
+  public AwaitForSignal getEventAwaitForSignal() {
+    return eventAwaitForSignal;
   }
 
   @Override
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
index 06937f8..c9d3414 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
@@ -31,6 +31,7 @@ import org.apache.ratis.statemachine.SnapshotInfo;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 
 /**
  * The default implementation of {@link LogAppender}
@@ -145,12 +146,7 @@ class LogAppenderDefault extends LogAppenderBase {
         }
       }
       if (isRunning() && !hasAppendEntries()) {
-        final long waitTime = getHeartbeatWaitTimeMs();
-        if (waitTime > 0) {
-          synchronized (this) {
-            wait(waitTime);
-          }
-        }
+        getEventAwaitForSignal().await(getHeartbeatWaitTimeMs(), 
TimeUnit.MILLISECONDS);
       }
       getLeaderState().checkHealth(getFollower());
     }

Reply via email to