This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new d5cd3f6  RATIS-542. When a server restarts, purge(-1) causes 
IllegalStateException.
d5cd3f6 is described below

commit d5cd3f6c69082f42cb1a36d2c9a5a27f1d99a836
Author: Tsz Wo Nicholas Sze <[email protected]>
AuthorDate: Thu May 16 14:02:45 2019 -0700

    RATIS-542. When a server restarts, purge(-1) causes IllegalStateException.
---
 .../org/apache/ratis/server/impl/ServerState.java  |   1 -
 .../ratis/server/impl/StateMachineUpdater.java     | 191 ++++++++++++---------
 .../org/apache/ratis/server/raftlog/RaftLog.java   |   1 +
 .../apache/ratis/server/raftlog/RaftLogIndex.java  |   7 +
 .../ratis/server/raftlog/memory/MemoryRaftLog.java |   6 -
 .../apache/ratis/server/storage/RaftStorage.java   |  20 +--
 6 files changed, 117 insertions(+), 109 deletions(-)

diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index 7827ea5..0c49716 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -158,7 +158,6 @@ public class ServerState implements Closeable {
   private long initStatemachine(StateMachine sm, RaftGroupId gid)
       throws IOException {
     sm.initialize(server.getProxy(), gid, storage);
-    storage.setStateMachineStorage(sm.getStateMachineStorage());
     SnapshotInfo snapshot = sm.getLatestSnapshot();
 
     if (snapshot == null || snapshot.getTermIndex().getIndex() < 0) {
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
index 97fe4e4..d28c7ab 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -22,6 +22,8 @@ import org.apache.ratis.protocol.Message;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.raftlog.RaftLog;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.server.raftlog.RaftLogIOException;
+import org.apache.ratis.server.raftlog.RaftLogIndex;
 import org.apache.ratis.statemachine.SnapshotInfo;
 import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.util.*;
@@ -31,7 +33,11 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
 
 /**
  * This class tracks the log entries that have been committed in a quorum and
@@ -44,38 +50,42 @@ import java.util.concurrent.CompletableFuture;
  */
 class StateMachineUpdater implements Runnable {
   static final Logger LOG = LoggerFactory.getLogger(StateMachineUpdater.class);
-  private volatile Long stopIndex = null;
 
   enum State {
     RUNNING, STOP, RELOAD
   }
 
-  private final RaftProperties properties;
+  private final Consumer<Object> infoIndexChange;
+  private final Consumer<Object> debugIndexChange;
+  private final String name;
+
   private final StateMachine stateMachine;
   private final RaftServerImpl server;
   private final RaftLog raftLog;
 
-  private volatile long lastAppliedIndex;
-
-  private final boolean autoSnapshotEnabled;
-  private final long autoSnapshotThreshold;
-  private long lastSnapshotIndex;
+  private final Long autoSnapshotThreshold;
 
   private final Thread updater;
+  private final RaftLogIndex appliedIndex;
+  private final RaftLogIndex snapshotIndex;
+  private final AtomicReference<Long> stopIndex = new AtomicReference<>();
   private volatile State state = State.RUNNING;
 
   StateMachineUpdater(StateMachine stateMachine, RaftServerImpl server,
       RaftLog raftLog, long lastAppliedIndex, RaftProperties properties) {
-    this.properties = properties;
+    this.name = getClass().getSimpleName() + ":" + raftLog.getSelfId() + ":" + 
server.getGroupId();
+    this.infoIndexChange = s -> LOG.info("{}: {}", name, s);
+    this.debugIndexChange = s -> LOG.debug("{}: {}", name, s);
+
     this.stateMachine = stateMachine;
     this.server = server;
     this.raftLog = raftLog;
 
-    this.lastAppliedIndex = lastAppliedIndex;
-    lastSnapshotIndex = lastAppliedIndex;
+    this.appliedIndex = new RaftLogIndex("appliedIndex", lastAppliedIndex);
+    this.snapshotIndex = new RaftLogIndex("snapshotIndex", lastAppliedIndex);
 
-    autoSnapshotEnabled = 
RaftServerConfigKeys.Snapshot.autoTriggerEnabled(properties);
-    autoSnapshotThreshold = 
RaftServerConfigKeys.Snapshot.autoTriggerThreshold(properties);
+    final boolean autoSnapshot = 
RaftServerConfigKeys.Snapshot.autoTriggerEnabled(properties);
+    this.autoSnapshotThreshold = autoSnapshot? 
RaftServerConfigKeys.Snapshot.autoTriggerThreshold(properties): null;
     updater = new Daemon(this);
   }
 
@@ -87,25 +97,18 @@ class StateMachineUpdater implements Runnable {
     state = State.STOP;
     try {
       stateMachine.close();
-    } catch (IOException ignored) {
-      LOG.warn(server.getId() + ": Failed to close "
-          + stateMachine.getClass().getSimpleName()
-          + " " + stateMachine, ignored);
+    } catch(Throwable t) {
+      LOG.warn(name + ": Failed to close " + 
stateMachine.getClass().getSimpleName() + " " + stateMachine, t);
     }
   }
 
   /**
    * Stop the updater thread after all the committed transactions
    * have been applied to the state machine.
-   *
-   * @throws InterruptedException
    */
   void stopAndJoin() throws InterruptedException {
-    if (stopIndex == null) {
-      synchronized (this) {
-        this.stopIndex = raftLog.getLastCommittedIndex();
-        notifyUpdater();
-      }
+    if (stopIndex.compareAndSet(null, raftLog.getLastCommittedIndex())) {
+      notifyUpdater();
       LOG.info("{}: set stopIndex = {}", this, stopIndex);
     }
     updater.join();
@@ -122,76 +125,35 @@ class StateMachineUpdater implements Runnable {
 
   @Override
   public String toString() {
-    return this.getClass().getSimpleName() + "-" + raftLog.getSelfId() + "-" + 
server.getGroupId();
+    return name;
   }
 
   @Override
   public void run() {
-    while (isRunning()) {
+    for(; state != State.STOP; ) {
       try {
-        synchronized (this) {
-          // when the peers just start, the committedIndex is initialized as 0
-          // and will be updated only after the leader contacts other peers.
-          // Thus initially lastAppliedIndex can be greater than lastCommitted.
-          while (lastAppliedIndex >= raftLog.getLastCommittedIndex()
-              && !shouldStop()) {
-            wait();
-          }
-        }
-
-        final long committedIndex = raftLog.getLastCommittedIndex();
-        Preconditions.assertTrue(lastAppliedIndex <= committedIndex);
+        waitForCommit();
 
         if (state == State.RELOAD) {
-          Preconditions.assertTrue(stateMachine.getLifeCycleState() == 
LifeCycle.State.PAUSED);
-
-          stateMachine.reinitialize();
-
-          SnapshotInfo snapshot = stateMachine.getLatestSnapshot();
-          Preconditions.assertTrue(snapshot != null && snapshot.getIndex() > 
lastAppliedIndex,
-              "Snapshot: %s, lastAppliedIndex: %s", snapshot, 
lastAppliedIndex);
-
-          lastAppliedIndex = snapshot.getIndex();
-          lastSnapshotIndex = snapshot.getIndex();
-          state = State.RUNNING;
+          reload();
         }
 
-        final MemoizedSupplier<List<CompletableFuture<Message>>> futures
-            = MemoizedSupplier.valueOf(() -> new ArrayList<>());
-        while (lastAppliedIndex < committedIndex) {
-          final long nextIndex = lastAppliedIndex + 1;
-          final LogEntryProto next = raftLog.get(nextIndex);
-          if (next != null) {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("{}: applying nextIndex={}, nextLog={}",
-                  this, nextIndex, ServerProtoUtils.toString(next));
-            }
-            final CompletableFuture<Message> f = 
server.applyLogToStateMachine(next);
-            if (f != null) {
-              futures.get().add(f);
-            }
-            lastAppliedIndex = nextIndex;
-          } else {
-            LOG.debug("{}: logEntry {} is null. There may be snapshot to load. 
state:{}",
-                this, nextIndex, state);
-            break;
-          }
-        }
+        final MemoizedSupplier<List<CompletableFuture<Message>>> futures = 
applyLog();
 
         // check if need to trigger a snapshot
         if (shouldTakeSnapshot()) {
           if (futures.isInitialized()) {
             JavaUtils.allOf(futures.get()).get();
           }
-          lastSnapshotIndex = stateMachine.takeSnapshot();
-          raftLog.purge(lastSnapshotIndex);
+
+          takeSnapshot();
         }
 
         if (shouldStop()) {
           stop();
         }
       } catch (InterruptedException e) {
-        if (!isRunning()) {
+        if (state == State.STOP) {
           LOG.info("{}: the StateMachineUpdater is interrupted and will 
exit.", this);
         } else {
           final String s = this + ": the StateMachineUpdater is wrongly 
interrupted";
@@ -204,24 +166,87 @@ class StateMachineUpdater implements Runnable {
     }
   }
 
-  private boolean isRunning() {
-    return state != State.STOP;
+  private synchronized void waitForCommit() throws InterruptedException {
+    // When a peer starts, the committed is initialized to 0.
+    // It will be updated only after the leader contacts other peers.
+    // Thus it is possible to have applied > committed initially.
+    final long applied = getLastAppliedIndex();
+    for(; applied >= raftLog.getLastCommittedIndex() && state == State.RUNNING 
&& !shouldStop(); ) {
+      wait();
+    }
+  }
+
+  private void reload() throws IOException {
+    Preconditions.assertTrue(stateMachine.getLifeCycleState() == 
LifeCycle.State.PAUSED);
+
+    stateMachine.reinitialize();
+
+    final SnapshotInfo snapshot = stateMachine.getLatestSnapshot();
+    Objects.requireNonNull(snapshot, "snapshot == null");
+    final long i = snapshot.getIndex();
+    snapshotIndex.setUnconditionally(i, infoIndexChange);
+    appliedIndex.setUnconditionally(i, infoIndexChange);
+    state = State.RUNNING;
+  }
+
+  private MemoizedSupplier<List<CompletableFuture<Message>>> applyLog() throws 
RaftLogIOException {
+    final MemoizedSupplier<List<CompletableFuture<Message>>> futures = 
MemoizedSupplier.valueOf(ArrayList::new);
+    final long committed = raftLog.getLastCommittedIndex();
+    for(long applied; (applied = getLastAppliedIndex()) < committed && state 
== State.RUNNING && !shouldStop(); ) {
+      final long nextIndex = applied + 1;
+      final LogEntryProto next = raftLog.get(nextIndex);
+      if (next != null) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("{}: applying nextIndex={}, nextLog={}", this, nextIndex, 
ServerProtoUtils.toString(next));
+        } else {
+          LOG.debug("{}: applying nextIndex={}", this, nextIndex);
+        }
+
+        final CompletableFuture<Message> f = 
server.applyLogToStateMachine(next);
+        if (f != null) {
+          futures.get().add(f);
+        }
+        final long incremented = 
appliedIndex.incrementAndGet(debugIndexChange);
+        Preconditions.assertTrue(incremented == nextIndex);
+      } else {
+        LOG.debug("{}: logEntry {} is null. There may be snapshot to load. 
state:{}",
+            this, nextIndex, state);
+        break;
+      }
+    }
+    return futures;
+  }
+
+  private void takeSnapshot() {
+    final long i;
+    try {
+      i = stateMachine.takeSnapshot();
+    } catch (IOException e) {
+      LOG.error(name + ": Failed to take snapshot", e);
+      return;
+    }
+
+    if (i >= 0) {
+      LOG.info("{}: Took a snapshot at index {}", name, i);
+      snapshotIndex.updateIncreasingly(i, infoIndexChange);
+      raftLog.purge(i);
+    }
   }
 
   private boolean shouldStop() {
-    return stopIndex != null && getLastAppliedIndex() >= stopIndex;
+    return Optional.ofNullable(stopIndex.get()).filter(i -> i <= 
getLastAppliedIndex()).isPresent();
   }
 
   private boolean shouldTakeSnapshot() {
-    return autoSnapshotEnabled &&
-        ( ((state != State.RELOAD)
-            && (getLastAppliedIndex() - lastSnapshotIndex
-              >= autoSnapshotThreshold))
-          || shouldStop()
-        );
+    if (autoSnapshotThreshold == null) {
+      return false;
+    } else if (shouldStop()) {
+      return true;
+    }
+    return state == State.RUNNING && getLastAppliedIndex() - 
snapshotIndex.get() >= autoSnapshotThreshold;
   }
 
   long getLastAppliedIndex() {
-    return lastAppliedIndex;
+    return appliedIndex.get();
   }
 }
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
index b7d2d5c..bc35014 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
@@ -319,6 +319,7 @@ public abstract class RaftLog implements 
RaftLogSequentialOps, Closeable {
    * @param index - is inclusive.
    */
   public final CompletableFuture<Long> purge(long index) {
+    LOG.info("{}: purge {}", getName(), index);
     return purgeImpl(index);
   }
 
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogIndex.java 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogIndex.java
index 1c42d84..1241c5b 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogIndex.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogIndex.java
@@ -71,6 +71,13 @@ public class RaftLogIndex {
     return updated;
   }
 
+  public long incrementAndGet(Consumer<Object> log) {
+    final long newIndex = index.incrementAndGet();
+    log.accept(StringUtils.stringSupplierAsObject(
+        () -> name + ": incrementAndGet " + (newIndex-1) + " -> " + newIndex));
+    return newIndex;
+  }
+
   @Override
   public String toString() {
     return name + ":" + index;
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
index 3626755..bbe2e1c 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
@@ -197,12 +197,6 @@ public class MemoryRaftLog extends RaftLog {
     }
   }
 
-  @Override
-  public String toString() {
-    return "last=" + getLastEntryTermIndex() + ", committed="
-        + ServerProtoUtils.toString(get(getLastCommittedIndex()));
-  }
-
   public String getEntryString() {
     return "entries=" + entries;
   }
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorage.java 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorage.java
index 35fcaaf..81a9de2 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorage.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorage.java
@@ -22,8 +22,6 @@ import org.apache.ratis.server.impl.RaftConfiguration;
 import org.apache.ratis.server.impl.RaftServerConstants;
 import org.apache.ratis.server.impl.ServerProtoUtils;
 import org.apache.ratis.server.storage.RaftStorageDirectory.StorageState;
-import org.apache.ratis.statemachine.SnapshotInfo;
-import org.apache.ratis.statemachine.StateMachineStorage;
 import org.apache.ratis.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,7 +40,6 @@ public class RaftStorage implements Closeable {
   private final RaftStorageDirectory storageDir;
   private final StorageState state;
   private volatile MetaFile metaFile;
-  private StateMachineStorage stateMachineStorage;
 
   public RaftStorage(File dir, RaftServerConstants.StartupOption option)
       throws IOException {
@@ -140,23 +137,8 @@ public class RaftStorage implements Closeable {
     }
   }
 
-  public SnapshotInfo getLatestSnapshot() throws IOException {
-    return getStateMachineStorage().getLatestSnapshot();
-  }
-
-  /**
-   * Called by the state machine after it has initialized the 
StateMachineStorage.
-   */
-  public void setStateMachineStorage(StateMachineStorage smStorage) {
-    this.stateMachineStorage = smStorage;
-  }
-
-  public StateMachineStorage getStateMachineStorage() {
-    return stateMachineStorage;
-  }
-
   @Override
   public String toString() {
-    return getStorageDir() + "";
+    return getClass().getSimpleName() + ":" + getStorageDir();
   }
 }

Reply via email to