This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new af71d409a RATIS-1901. Update Counter example to benchmark performance. 
(#953)
af71d409a is described below

commit af71d409a2a3847a6cda602bba5fb995c1c8a529
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Mon Oct 30 21:23:25 2023 -0700

    RATIS-1901. Update Counter example to benchmark performance. (#953)
---
 .../org/apache/ratis/util/ConcurrentUtils.java     |  29 +++-
 .../main/java/org/apache/ratis/util/JavaUtils.java |   9 ++
 .../apache/ratis/examples/common/Constants.java    |  44 ++++-
 .../ratis/examples/counter/CounterCommand.java     |   6 +
 .../examples/counter/client/CounterClient.java     | 178 ++++++++++++++-------
 .../examples/counter/server/CounterServer.java     |   2 +
 .../counter/server/CounterStateMachine.java        |  67 +++++---
 .../ratis/examples/membership/server/CServer.java  |   3 +-
 .../apache/ratis/examples/counter/TestCounter.java |   6 +-
 9 files changed, 241 insertions(+), 103 deletions(-)

diff --git 
a/ratis-common/src/main/java/org/apache/ratis/util/ConcurrentUtils.java 
b/ratis-common/src/main/java/org/apache/ratis/util/ConcurrentUtils.java
index 372fa62bc..b6e2e9bf6 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/ConcurrentUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/ConcurrentUtils.java
@@ -34,6 +34,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
+import java.util.stream.Stream;
 
 /**
  * Utilities related to concurrent programming.
@@ -153,13 +154,14 @@ public interface ConcurrentUtils {
   }
 
   /**
-   * The same as collection.parallelStream().forEach(action) except that
+   * The same as {@link Collection#parallelStream()}.forEach(action) except 
that
    * (1) this method is asynchronous,
-   * (2) an executor can be passed to this method, and
+   * (2) this method has an executor parameter, and
    * (3) the action can throw a checked exception.
    *
-   * @param collection The given collection.
-   * @param action To act on each element in the collection.
+   * @param stream The stream to be processed.
+   * @param size The estimated size of the stream.
+   * @param action To act on each element in the stream.
    * @param executor To execute the action.
    * @param <E> The element type.
    * @param <THROWABLE> the exception type.
@@ -172,9 +174,9 @@ public interface ConcurrentUtils {
    * @see java.util.stream.Stream#forEach(Consumer)
    */
   static <E, THROWABLE extends Throwable> CompletableFuture<Void> 
parallelForEachAsync(
-      Collection<E> collection, CheckedConsumer<? super E, THROWABLE> action, 
Executor executor) {
-    final List<CompletableFuture<E>> futures = new 
ArrayList<>(collection.size());
-    collection.forEach(element -> {
+      Stream<E> stream, int size, CheckedConsumer<? super E, THROWABLE> 
action, Executor executor) {
+    final List<CompletableFuture<E>> futures = new ArrayList<>(size);
+    stream.forEach(element -> {
       final CompletableFuture<E> f = new CompletableFuture<>();
       futures.add(f);
       executor.execute(() -> accept(action, element, f));
@@ -182,6 +184,19 @@ public interface ConcurrentUtils {
     return JavaUtils.allOf(futures);
   }
 
+  /** The same as parallelForEachAsync(collection.stream(), collection.size(), 
action, executor). */
+  static <E, THROWABLE extends Throwable> CompletableFuture<Void> 
parallelForEachAsync(
+      Collection<E> collection, CheckedConsumer<? super E, THROWABLE> action, 
Executor executor) {
+    return parallelForEachAsync(collection.stream(), collection.size(), 
action, executor);
+  }
+
+  /** The same as parallelForEachAsync(collection.stream(), collection.size(), 
action, executor). */
+  static <THROWABLE extends Throwable> CompletableFuture<Void> 
parallelForEachAsync(
+      int size, CheckedConsumer<Integer, THROWABLE> action, Executor executor) 
{
+    final AtomicInteger i = new AtomicInteger();
+    return 
parallelForEachAsync(Stream.generate(i::getAndIncrement).limit(size), size, 
action, executor);
+  }
+
   static <E, THROWABLE extends Throwable> void accept(
       CheckedConsumer<? super E, THROWABLE> action, E element, 
CompletableFuture<E> f) {
     try {
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java 
b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
index 80162e101..ff6f0f93d 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
@@ -152,6 +152,12 @@ public interface JavaUtils {
     return doPrivileged(() -> System.getProperty(key), () -> "get system 
property " + key);
   }
 
+  static String getEnv(String variable) {
+    final String value = System.getenv().get(variable);
+    LOG.info("ENV: {} = {}", variable, value);
+    return value;
+  }
+
   /**
    * Similar to {@link System#setProperty(String, String)}
    * except that this method may invoke {@link 
AccessController#doPrivileged(PrivilegedAction)}
@@ -275,6 +281,9 @@ public interface JavaUtils {
   }
 
   static <T> CompletableFuture<Void> allOf(Collection<CompletableFuture<T>> 
futures) {
+    if (futures == null || futures.isEmpty()) {
+      return CompletableFuture.completedFuture(null);
+    }
     return 
CompletableFuture.allOf(futures.toArray(EMPTY_COMPLETABLE_FUTURE_ARRAY));
   }
 
diff --git 
a/ratis-examples/src/main/java/org/apache/ratis/examples/common/Constants.java 
b/ratis-examples/src/main/java/org/apache/ratis/examples/common/Constants.java
index c7559597e..ba80aeb4f 100644
--- 
a/ratis-examples/src/main/java/org/apache/ratis/examples/common/Constants.java
+++ 
b/ratis-examples/src/main/java/org/apache/ratis/examples/common/Constants.java
@@ -21,18 +21,22 @@ package org.apache.ratis.examples.common;
 import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.TimeDuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
-import java.io.FileInputStream;
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.InputStreamReader;
-import java.io.Reader;
 import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Optional;
 import java.util.Properties;
@@ -40,22 +44,46 @@ import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * Constants across servers and clients
  */
 public final class Constants {
+  private static final Logger LOG = LoggerFactory.getLogger(Constants.class);
+
+  private static final String CONF_FILE_NAME = "conf.properties";
+  private static final List<String> CONF_FILE_DEFAULTS = 
Collections.unmodifiableList(Arrays.asList(
+      "examples/conf/" + CONF_FILE_NAME, // for release tarball layout
+      "ratis-examples/src/main/resources/" + CONF_FILE_NAME)); // for source 
tree layout
+  private static final String CONF_FILE_ENV_VAR_NAME = "RATIS_EXAMPLE_CONF";
+
+  static Path getConfPath() {
+    final String env = JavaUtils.getEnv(CONF_FILE_ENV_VAR_NAME);
+    final Stream<String> s = Stream.concat(
+        Optional.ofNullable(env).map(Stream::of).orElseGet(Stream::empty),
+        CONF_FILE_DEFAULTS.stream());
+    for(final Iterator<String> i = s.iterator(); i.hasNext(); ) {
+      final Path p = Paths.get(i.next());
+      if (Files.exists(p)) {
+        LOG.info("Using conf file {}", p);
+        return p;
+      }
+    }
+    throw new IllegalArgumentException("Conf file not found: please set 
environment variable \""
+        + CONF_FILE_ENV_VAR_NAME + "\"");
+  }
+
   public static final List<RaftPeer> PEERS;
   public static final String PATH;
   public static final List<TimeDuration> SIMULATED_SLOWNESS;
 
   static {
     final Properties properties = new Properties();
-    final String conf = "ratis-examples/src/main/resources/conf.properties";
-    try(InputStream inputStream = new FileInputStream(conf);
-        Reader reader = new InputStreamReader(inputStream, 
StandardCharsets.UTF_8);
-        BufferedReader bufferedReader = new BufferedReader(reader)) {
-      properties.load(bufferedReader);
+    final Path conf = getConfPath();
+    try(BufferedReader in = new BufferedReader(new InputStreamReader(
+        Files.newInputStream(conf), StandardCharsets.UTF_8))) {
+      properties.load(in);
     } catch (IOException e) {
       throw new IllegalStateException("Failed to load " + conf, e);
     }
diff --git 
a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/CounterCommand.java
 
b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/CounterCommand.java
index 843a158fb..905cc004b 100644
--- 
a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/CounterCommand.java
+++ 
b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/CounterCommand.java
@@ -18,6 +18,7 @@
 package org.apache.ratis.examples.counter;
 
 import org.apache.ratis.protocol.Message;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 
 /**
  * The supported commands the Counter example.
@@ -38,4 +39,9 @@ public enum CounterCommand {
   public boolean matches(String command) {
     return name().equalsIgnoreCase(command);
   }
+
+  /** Does the given command string match this command? */
+  public boolean matches(ByteString command) {
+    return message.getContent().equals(command);
+  }
 }
diff --git 
a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/client/CounterClient.java
 
b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/client/CounterClient.java
index 7076809f5..cb6c72725 100644
--- 
a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/client/CounterClient.java
+++ 
b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/client/CounterClient.java
@@ -22,6 +22,13 @@ import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.examples.common.Constants;
 import org.apache.ratis.examples.counter.CounterCommand;
 import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.util.ConcurrentUtils;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.Timestamp;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -29,9 +36,12 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 /**
  * Counter client application, this application sends specific number of
@@ -42,81 +52,126 @@ import java.util.concurrent.Future;
  * parameter found, application use default value which is 10
  */
 public final class CounterClient implements Closeable {
+  enum Mode {
+    DRY_RUN, IO, ASYNC;
+
+    static Mode parse(String s) {
+      for(Mode m : values()) {
+        if (m.name().equalsIgnoreCase(s)) {
+          return m;
+        }
+      }
+      return DRY_RUN;
+    }
+  }
+
   //build the client
-  private final RaftClient client = RaftClient.newBuilder()
-      .setProperties(new RaftProperties())
-      .setRaftGroup(Constants.RAFT_GROUP)
-      .build();
+  static RaftClient newClient() {
+    return RaftClient.newBuilder()
+        .setProperties(new RaftProperties())
+        .setRaftGroup(Constants.RAFT_GROUP)
+        .build();
+  }
+
+  private final RaftClient client = newClient();
 
   @Override
   public void close() throws IOException {
     client.close();
   }
 
-  private void run(int increment, boolean blocking) throws Exception {
-    System.out.printf("Sending %d %s command(s) using the %s ...%n",
-        increment, CounterCommand.INCREMENT, blocking? "BlockingApi": 
"AsyncApi");
-    final List<Future<RaftClientReply>> futures = new ArrayList<>(increment);
+  static RaftClientReply assertReply(RaftClientReply reply) {
+    Preconditions.assertTrue(reply.isSuccess(), "Failed");
+    return reply;
+  }
+
+  static void send(int increment, Mode mode, RaftClient client) throws 
Exception {
+    final List<CompletableFuture<RaftClientReply>> futures = new 
ArrayList<>(increment);
 
     //send INCREMENT command(s)
-    if (blocking) {
+    if (mode == Mode.IO) {
       // use BlockingApi
-      final ExecutorService executor = Executors.newFixedThreadPool(10);
       for (int i = 0; i < increment; i++) {
-        final Future<RaftClientReply> f = executor.submit(
-            () -> client.io().send(CounterCommand.INCREMENT.getMessage()));
-        futures.add(f);
+        final RaftClientReply reply = 
client.io().send(CounterCommand.INCREMENT.getMessage());
+        futures.add(CompletableFuture.completedFuture(reply));
       }
-      executor.shutdown();
-    } else {
+    } else if (mode == Mode.ASYNC) {
       // use AsyncApi
       for (int i = 0; i < increment; i++) {
-        final Future<RaftClientReply> f = 
client.async().send(CounterCommand.INCREMENT.getMessage());
-        futures.add(f);
+        
futures.add(client.async().send(CounterCommand.INCREMENT.getMessage()).thenApply(CounterClient::assertReply));
       }
+
+      //wait for the futures
+      JavaUtils.allOf(futures).get();
     }
+  }
 
-    //wait for the futures
-    for (Future<RaftClientReply> f : futures) {
-      final RaftClientReply reply = f.get();
-      if (reply.isSuccess()) {
-        final String count = reply.getMessage().getContent().toStringUtf8();
-        System.out.println("Counter is incremented to " + count);
-      } else {
-        System.err.println("Failed " + reply);
-      }
+  private void send(int i, int increment, Mode mode) {
+    System.out.println("Start client " + i);
+    try (RaftClient c = newClient()) {
+      send(increment, mode, c);
+    } catch (Exception e) {
+      throw new CompletionException(e);
+    }
+  }
+
+  private RaftClientReply readCounter(RaftPeerId server) {
+    try {
+      return client.io().sendReadOnly(CounterCommand.GET.getMessage(), server);
+    } catch (IOException e) {
+      System.err.println("Failed read-only request");
+      return RaftClientReply.newBuilder().setSuccess(false).build();
+    }
+  }
+
+  private void readComplete(RaftClientReply reply, Throwable t, RaftPeerId 
server, Timestamp readStarted) {
+    if (t != null) {
+      System.err.println("Failed to get counter from " + server + ": " + t);
+      return;
+    } else if (reply == null || !reply.isSuccess()) {
+      System.err.println("Failed to get counter from " + server + " with reply 
= " + reply);
+      return;
+    }
+
+    // reply is success
+    final TimeDuration readElapsed = readStarted.elapsedTime();
+    final int countValue = 
reply.getMessage().getContent().asReadOnlyByteBuffer().getInt();
+    System.out.printf("read from %s and get counter value: %d, time elapsed: 
%s.%n",
+        server, countValue, readElapsed.toString(TimeUnit.SECONDS, 3));
+  }
+
+  private void run(int increment, Mode mode, int numClients, ExecutorService 
executor) throws Exception {
+    Preconditions.assertTrue(increment > 0, "increment <= 0");
+    Preconditions.assertTrue(numClients > 0, "numClients <= 0");
+    System.out.printf("Sending %d %s command(s) in %s mode with %d client(s) 
...%n",
+        increment, CounterCommand.INCREMENT, mode, numClients);
+    final Timestamp sendStarted = Timestamp.currentTime();
+    ConcurrentUtils.parallelForEachAsync(numClients, i -> send(i, increment, 
mode), executor).get();
+    final TimeDuration sendElapsed = sendStarted.elapsedTime();
+    final long numOp = numClients * (long)increment;
+    
System.out.println("******************************************************");
+    System.out.printf("*   Completed sending %d command(s) in %s%n",
+        numOp, sendElapsed.toString(TimeUnit.SECONDS, 3));
+    System.out.printf("*   The rate is %01.2f op/s%n",
+        numOp * 1000.0 / sendElapsed.toLong(TimeUnit.MILLISECONDS));
+    
System.out.println("******************************************************");
+
+    if (mode == Mode.DRY_RUN) {
+      return;
     }
 
     //send a GET command and print the reply
     final RaftClientReply reply = 
client.io().sendReadOnly(CounterCommand.GET.getMessage());
-    final String count = reply.getMessage().getContent().toStringUtf8();
+    final int count = 
reply.getMessage().getContent().asReadOnlyByteBuffer().getInt();
     System.out.println("Current counter value: " + count);
 
     // using Linearizable Read
-    futures.clear();
-    final long startTime = System.currentTimeMillis();
-    final ExecutorService executor = 
Executors.newFixedThreadPool(Constants.PEERS.size());
-    Constants.PEERS.forEach(p -> {
-      final Future<RaftClientReply> f = CompletableFuture.supplyAsync(() -> {
-                try {
-                  return 
client.io().sendReadOnly(CounterCommand.GET.getMessage(), p.getId());
-                } catch (IOException e) {
-                  System.err.println("Failed read-only request");
-                  return 
RaftClientReply.newBuilder().setSuccess(false).build();
-                }
-              }, executor).whenCompleteAsync((r, ex) -> {
-                if (ex != null || !r.isSuccess()) {
-                  System.err.println("Failed " + r);
-                  return;
-                }
-                final long endTime = System.currentTimeMillis();
-                final long elapsedSec = (endTime-startTime) / 1000;
-                final String countValue = 
r.getMessage().getContent().toStringUtf8();
-                System.out.println("read from " + p.getId() + " and get 
counter value: " + countValue
-                    + ", time elapsed: " + elapsedSec + " seconds");
-              });
-      futures.add(f);
-    });
+    final Timestamp readStarted = Timestamp.currentTime();
+    final List<CompletableFuture<RaftClientReply>> futures = 
Constants.PEERS.stream()
+        .map(RaftPeer::getId)
+        .map(server -> CompletableFuture.supplyAsync(() -> 
readCounter(server), executor)
+        .whenComplete((r, t) -> readComplete(r, t, server, readStarted)))
+        .collect(Collectors.toList());
 
     for (Future<RaftClientReply> f : futures) {
       f.get();
@@ -127,18 +182,27 @@ public final class CounterClient implements Closeable {
     try(CounterClient client = new CounterClient()) {
       //the number of INCREMENT commands, default is 10
       final int increment = args.length > 0 ? Integer.parseInt(args[0]) : 10;
-      final boolean io = args.length > 1 && "io".equalsIgnoreCase(args[1]);
-      client.run(increment, io);
+      final Mode mode = Mode.parse(args.length > 1? args[1] : null);
+      final int numClients = args.length > 2 ? Integer.parseInt(args[2]) : 1;
+
+      final ExecutorService executor = 
Executors.newFixedThreadPool(Math.max(numClients, Constants.PEERS.size()));
+      try {
+        client.run(increment, mode, numClients, executor);
+      } finally {
+        executor.shutdown();
+      }
     } catch (Throwable e) {
       e.printStackTrace();
       System.err.println();
       System.err.println("args = " + Arrays.toString(args));
       System.err.println();
-      System.err.println("Usage: java 
org.apache.ratis.examples.counter.client.CounterClient [increment] [async|io]");
+      System.err.printf("Usage: java %s [INCREMENT] [DRY_RUN|ASYNC|IO] 
[CLIENTS]%n", CounterClient.class.getName());
       System.err.println();
-      System.err.println("       increment: the number of INCREMENT commands 
to be sent (default is 10)");
-      System.err.println("       async    : use the AsyncApi (default)");
-      System.err.println("       io       : use the BlockingApi");
+      System.err.println("       INCREMENT: the number of INCREMENT commands 
to be sent (default is 10)");
+      System.err.println("       DRY_RUN  : dry run only (default)");
+      System.err.println("       ASYNC    : use the AsyncApi");
+      System.err.println("       IO       : use the BlockingApi");
+      System.err.println("       CLIENTS  : the number of clients (default is 
1)");
       System.exit(1);
     }
   }
diff --git 
a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterServer.java
 
b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterServer.java
index fab7b36c3..36dca5d6f 100644
--- 
a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterServer.java
+++ 
b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterServer.java
@@ -23,6 +23,7 @@ import org.apache.ratis.grpc.GrpcConfigKeys;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.storage.RaftStorage;
 import org.apache.ratis.util.NetUtils;
 import org.apache.ratis.util.TimeDuration;
 
@@ -76,6 +77,7 @@ public final class CounterServer implements Closeable {
         .setProperties(properties)
         .setServerId(peer.getId())
         .setStateMachine(counterStateMachine)
+        .setOption(RaftStorage.StartupOption.RECOVER)
         .build();
   }
 
diff --git 
a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterStateMachine.java
 
b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterStateMachine.java
index 90f41ddad..b88a763e0 100644
--- 
a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterStateMachine.java
+++ 
b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterStateMachine.java
@@ -22,16 +22,18 @@ import org.apache.ratis.io.MD5Hash;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
 import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.protocol.TermIndex;
-import org.apache.ratis.server.raftlog.RaftLog;
 import org.apache.ratis.server.storage.FileInfo;
 import org.apache.ratis.server.storage.RaftStorage;
 import org.apache.ratis.statemachine.TransactionContext;
 import org.apache.ratis.statemachine.impl.BaseStateMachine;
 import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
 import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.thirdparty.com.google.protobuf.UnsafeByteOperations;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.MD5FileUtil;
 import org.apache.ratis.util.TimeDuration;
@@ -42,6 +44,7 @@ import java.io.File;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.nio.ByteBuffer;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.concurrent.CompletableFuture;
@@ -83,10 +86,11 @@ public class CounterStateMachine extends BaseStateMachine {
   private final TimeDuration simulatedSlowness;
 
   public CounterStateMachine(TimeDuration simulatedSlowness) {
-    this.simulatedSlowness = simulatedSlowness;
+    this.simulatedSlowness = simulatedSlowness.isPositive()? 
simulatedSlowness: null;
   }
-  CounterStateMachine() {
-    this.simulatedSlowness = TimeDuration.ZERO;
+
+  public CounterStateMachine() {
+    this(TimeDuration.ZERO);
   }
 
   /** @return the current state. */
@@ -100,13 +104,13 @@ public class CounterStateMachine extends BaseStateMachine 
{
   }
 
   private synchronized int incrementCounter(TermIndex termIndex) {
-    try {
-      if (!simulatedSlowness.equals(TimeDuration.ZERO)) {
+    if (simulatedSlowness != null) {
+      try {
         simulatedSlowness.sleep();
+      } catch (InterruptedException e) {
+        LOG.warn("{}: get interrupted in simulated slowness sleep before apply 
transaction", this);
+        Thread.currentThread().interrupt();
       }
-    } catch (InterruptedException e) {
-      LOG.warn("{}: get interrupted in simulated slowness sleep before apply 
transaction", this);
-      Thread.currentThread().interrupt();
     }
     updateLastAppliedTermIndex(termIndex);
     return counter.incrementAndGet();
@@ -173,19 +177,18 @@ public class CounterStateMachine extends BaseStateMachine 
{
    * Load the state of the state machine from the {@link #storage}.
    *
    * @param snapshot the information of the snapshot being loaded
-   * @return the index of the snapshot or -1 if snapshot is invalid
    * @throws IOException if it failed to read from storage
    */
-  private long load(SingleFileSnapshotInfo snapshot) throws IOException {
+  private void load(SingleFileSnapshotInfo snapshot) throws IOException {
     //check null
     if (snapshot == null) {
-      return RaftLog.INVALID_LOG_INDEX;
+      return;
     }
     //check if the snapshot file exists.
     final Path snapshotPath = snapshot.getFile().getPath();
     if (!Files.exists(snapshotPath)) {
       LOG.warn("The snapshot file {} does not exist for snapshot {}", 
snapshotPath, snapshot);
-      return RaftLog.INVALID_LOG_INDEX;
+      return;
     }
 
     // verify md5
@@ -205,8 +208,6 @@ public class CounterStateMachine extends BaseStateMachine {
 
     //update state
     updateState(last, counterValue);
-
-    return last.getIndex();
   }
 
   /**
@@ -221,7 +222,21 @@ public class CounterStateMachine extends BaseStateMachine {
     if (!CounterCommand.GET.matches(command)) {
       return JavaUtils.completeExceptionally(new 
IllegalArgumentException("Invalid Command: " + command));
     }
-    return 
CompletableFuture.completedFuture(Message.valueOf(counter.toString()));
+    return 
CompletableFuture.completedFuture(Message.valueOf(toByteString(counter.get())));
+  }
+
+  /**
+   * Validate the request and then build a {@link TransactionContext}.
+   */
+  @Override
+  public TransactionContext startTransaction(RaftClientRequest request) throws 
IOException {
+    final TransactionContext transaction = super.startTransaction(request);
+    //check if the command is valid
+    final ByteString command = request.getMessage().getContent();
+    if (!CounterCommand.INCREMENT.matches(command)) {
+      transaction.setException(new IllegalArgumentException("Invalid Command: 
" + command));
+    }
+    return transaction;
   }
 
   /**
@@ -233,22 +248,22 @@ public class CounterStateMachine extends BaseStateMachine 
{
   @Override
   public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
     final LogEntryProto entry = trx.getLogEntry();
-
-    //check if the command is valid
-    final String command = 
entry.getStateMachineLogEntry().getLogData().toStringUtf8();
-    if (!CounterCommand.INCREMENT.matches(command)) {
-      return JavaUtils.completeExceptionally(new 
IllegalArgumentException("Invalid Command: " + command));
-    }
     //increment the counter and update term-index
     final TermIndex termIndex = TermIndex.valueOf(entry);
-    final long incremented = incrementCounter(termIndex);
+    final int incremented = incrementCounter(termIndex);
 
     //if leader, log the incremented value and the term-index
-    if (trx.getServerRole() == RaftPeerRole.LEADER) {
-      LOG.info("{}: Increment to {}", termIndex, incremented);
+    if (LOG.isDebugEnabled() && trx.getServerRole() == RaftPeerRole.LEADER) {
+      LOG.debug("{}: Increment to {}", termIndex, incremented);
     }
 
     //return the new value of the counter to the client
-    return 
CompletableFuture.completedFuture(Message.valueOf(String.valueOf(incremented)));
+    return 
CompletableFuture.completedFuture(Message.valueOf(toByteString(incremented)));
+  }
+
+  static ByteString toByteString(int n) {
+    final byte[] array = new byte[4];
+    ByteBuffer.wrap(array).putInt(n);
+    return UnsafeByteOperations.unsafeWrap(array);
   }
 }
diff --git 
a/ratis-examples/src/main/java/org/apache/ratis/examples/membership/server/CServer.java
 
b/ratis-examples/src/main/java/org/apache/ratis/examples/membership/server/CServer.java
index 69cc4d58d..a846cd1e7 100644
--- 
a/ratis-examples/src/main/java/org/apache/ratis/examples/membership/server/CServer.java
+++ 
b/ratis-examples/src/main/java/org/apache/ratis/examples/membership/server/CServer.java
@@ -31,7 +31,6 @@ import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.storage.RaftStorage;
 import org.apache.ratis.thirdparty.com.google.common.base.MoreObjects;
 import org.apache.ratis.util.FileUtils;
-import org.apache.ratis.util.TimeDuration;
 
 import java.io.Closeable;
 import java.io.File;
@@ -59,7 +58,7 @@ public class CServer implements Closeable {
     NettyConfigKeys.Server.setPort(properties, port);
 
     // create the counter state machine which holds the counter value.
-    final CounterStateMachine counterStateMachine = new 
CounterStateMachine(TimeDuration.ZERO);
+    final CounterStateMachine counterStateMachine = new CounterStateMachine();
 
     // build the Raft server.
     this.server = RaftServer.newBuilder()
diff --git 
a/ratis-examples/src/test/java/org/apache/ratis/examples/counter/TestCounter.java
 
b/ratis-examples/src/test/java/org/apache/ratis/examples/counter/TestCounter.java
index 953fd5bfa..6fbe8770e 100644
--- 
a/ratis-examples/src/test/java/org/apache/ratis/examples/counter/TestCounter.java
+++ 
b/ratis-examples/src/test/java/org/apache/ratis/examples/counter/TestCounter.java
@@ -49,17 +49,17 @@ public class TestCounter extends ParameterizedBaseTest {
         client.io().send(Message.valueOf("INCREMENT"));
       }
       RaftClientReply reply1 = 
client.io().sendReadOnly(Message.valueOf("GET"));
-      Assert.assertEquals("10", 
reply1.getMessage().getContent().toStringUtf8());
+      Assert.assertEquals(10, 
reply1.getMessage().getContent().asReadOnlyByteBuffer().getInt());
       for (int i = 0; i < 10; i++) {
         client.io().send(Message.valueOf("INCREMENT"));
       }
       RaftClientReply reply2 = 
client.io().sendReadOnly(Message.valueOf("GET"));
-      Assert.assertEquals("20", 
reply2.getMessage().getContent().toStringUtf8());
+      Assert.assertEquals(20, 
reply2.getMessage().getContent().asReadOnlyByteBuffer().getInt());
       for (int i = 0; i < 10; i++) {
         client.io().send(Message.valueOf("INCREMENT"));
       }
       RaftClientReply reply3 = 
client.io().sendReadOnly(Message.valueOf("GET"));
-      Assert.assertEquals("30", 
reply3.getMessage().getContent().toStringUtf8());
+      Assert.assertEquals(30, 
reply3.getMessage().getContent().asReadOnlyByteBuffer().getInt());
     }
   }
 }

Reply via email to