This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new af71d409a RATIS-1901. Update Counter example to benchmark performance.
(#953)
af71d409a is described below
commit af71d409a2a3847a6cda602bba5fb995c1c8a529
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Mon Oct 30 21:23:25 2023 -0700
RATIS-1901. Update Counter example to benchmark performance. (#953)
---
.../org/apache/ratis/util/ConcurrentUtils.java | 29 +++-
.../main/java/org/apache/ratis/util/JavaUtils.java | 9 ++
.../apache/ratis/examples/common/Constants.java | 44 ++++-
.../ratis/examples/counter/CounterCommand.java | 6 +
.../examples/counter/client/CounterClient.java | 178 ++++++++++++++-------
.../examples/counter/server/CounterServer.java | 2 +
.../counter/server/CounterStateMachine.java | 67 +++++---
.../ratis/examples/membership/server/CServer.java | 3 +-
.../apache/ratis/examples/counter/TestCounter.java | 6 +-
9 files changed, 241 insertions(+), 103 deletions(-)
diff --git
a/ratis-common/src/main/java/org/apache/ratis/util/ConcurrentUtils.java
b/ratis-common/src/main/java/org/apache/ratis/util/ConcurrentUtils.java
index 372fa62bc..b6e2e9bf6 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/ConcurrentUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/ConcurrentUtils.java
@@ -34,6 +34,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
+import java.util.stream.Stream;
/**
* Utilities related to concurrent programming.
@@ -153,13 +154,14 @@ public interface ConcurrentUtils {
}
/**
- * The same as collection.parallelStream().forEach(action) except that
+ * The same as {@link Collection#parallelStream()}.forEach(action) except
that
* (1) this method is asynchronous,
- * (2) an executor can be passed to this method, and
+ * (2) this method has an executor parameter, and
* (3) the action can throw a checked exception.
*
- * @param collection The given collection.
- * @param action To act on each element in the collection.
+ * @param stream The stream to be processed.
+ * @param size The estimated size of the stream.
+ * @param action To act on each element in the stream.
* @param executor To execute the action.
* @param <E> The element type.
* @param <THROWABLE> the exception type.
@@ -172,9 +174,9 @@ public interface ConcurrentUtils {
* @see java.util.stream.Stream#forEach(Consumer)
*/
static <E, THROWABLE extends Throwable> CompletableFuture<Void>
parallelForEachAsync(
- Collection<E> collection, CheckedConsumer<? super E, THROWABLE> action,
Executor executor) {
- final List<CompletableFuture<E>> futures = new
ArrayList<>(collection.size());
- collection.forEach(element -> {
+ Stream<E> stream, int size, CheckedConsumer<? super E, THROWABLE>
action, Executor executor) {
+ final List<CompletableFuture<E>> futures = new ArrayList<>(size);
+ stream.forEach(element -> {
final CompletableFuture<E> f = new CompletableFuture<>();
futures.add(f);
executor.execute(() -> accept(action, element, f));
@@ -182,6 +184,19 @@ public interface ConcurrentUtils {
return JavaUtils.allOf(futures);
}
+ /** The same as parallelForEachAsync(collection.stream(), collection.size(),
action, executor). */
+ static <E, THROWABLE extends Throwable> CompletableFuture<Void>
parallelForEachAsync(
+ Collection<E> collection, CheckedConsumer<? super E, THROWABLE> action,
Executor executor) {
+ return parallelForEachAsync(collection.stream(), collection.size(),
action, executor);
+ }
+
+ /** The same as parallelForEachAsync(collection.stream(), collection.size(),
action, executor). */
+ static <THROWABLE extends Throwable> CompletableFuture<Void>
parallelForEachAsync(
+ int size, CheckedConsumer<Integer, THROWABLE> action, Executor executor)
{
+ final AtomicInteger i = new AtomicInteger();
+ return
parallelForEachAsync(Stream.generate(i::getAndIncrement).limit(size), size,
action, executor);
+ }
+
static <E, THROWABLE extends Throwable> void accept(
CheckedConsumer<? super E, THROWABLE> action, E element,
CompletableFuture<E> f) {
try {
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
index 80162e101..ff6f0f93d 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
@@ -152,6 +152,12 @@ public interface JavaUtils {
return doPrivileged(() -> System.getProperty(key), () -> "get system
property " + key);
}
+ static String getEnv(String variable) {
+ final String value = System.getenv().get(variable);
+ LOG.info("ENV: {} = {}", variable, value);
+ return value;
+ }
+
/**
* Similar to {@link System#setProperty(String, String)}
* except that this method may invoke {@link
AccessController#doPrivileged(PrivilegedAction)}
@@ -275,6 +281,9 @@ public interface JavaUtils {
}
static <T> CompletableFuture<Void> allOf(Collection<CompletableFuture<T>>
futures) {
+ if (futures == null || futures.isEmpty()) {
+ return CompletableFuture.completedFuture(null);
+ }
return
CompletableFuture.allOf(futures.toArray(EMPTY_COMPLETABLE_FUTURE_ARRAY));
}
diff --git
a/ratis-examples/src/main/java/org/apache/ratis/examples/common/Constants.java
b/ratis-examples/src/main/java/org/apache/ratis/examples/common/Constants.java
index c7559597e..ba80aeb4f 100644
---
a/ratis-examples/src/main/java/org/apache/ratis/examples/common/Constants.java
+++
b/ratis-examples/src/main/java/org/apache/ratis/examples/common/Constants.java
@@ -21,18 +21,22 @@ package org.apache.ratis.examples.common;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.TimeDuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
-import java.io.FileInputStream;
import java.io.IOException;
-import java.io.InputStream;
import java.io.InputStreamReader;
-import java.io.Reader;
import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
@@ -40,22 +44,46 @@ import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
/**
* Constants across servers and clients
*/
public final class Constants {
+ private static final Logger LOG = LoggerFactory.getLogger(Constants.class);
+
+ private static final String CONF_FILE_NAME = "conf.properties";
+ private static final List<String> CONF_FILE_DEFAULTS =
Collections.unmodifiableList(Arrays.asList(
+ "examples/conf/" + CONF_FILE_NAME, // for release tarball layout
+ "ratis-examples/src/main/resources/" + CONF_FILE_NAME)); // for source
tree layout
+ private static final String CONF_FILE_ENV_VAR_NAME = "RATIS_EXAMPLE_CONF";
+
+ static Path getConfPath() {
+ final String env = JavaUtils.getEnv(CONF_FILE_ENV_VAR_NAME);
+ final Stream<String> s = Stream.concat(
+ Optional.ofNullable(env).map(Stream::of).orElseGet(Stream::empty),
+ CONF_FILE_DEFAULTS.stream());
+ for(final Iterator<String> i = s.iterator(); i.hasNext(); ) {
+ final Path p = Paths.get(i.next());
+ if (Files.exists(p)) {
+ LOG.info("Using conf file {}", p);
+ return p;
+ }
+ }
+ throw new IllegalArgumentException("Conf file not found: please set
environment variable \""
+ + CONF_FILE_ENV_VAR_NAME + "\"");
+ }
+
public static final List<RaftPeer> PEERS;
public static final String PATH;
public static final List<TimeDuration> SIMULATED_SLOWNESS;
static {
final Properties properties = new Properties();
- final String conf = "ratis-examples/src/main/resources/conf.properties";
- try(InputStream inputStream = new FileInputStream(conf);
- Reader reader = new InputStreamReader(inputStream,
StandardCharsets.UTF_8);
- BufferedReader bufferedReader = new BufferedReader(reader)) {
- properties.load(bufferedReader);
+ final Path conf = getConfPath();
+ try(BufferedReader in = new BufferedReader(new InputStreamReader(
+ Files.newInputStream(conf), StandardCharsets.UTF_8))) {
+ properties.load(in);
} catch (IOException e) {
throw new IllegalStateException("Failed to load " + conf, e);
}
diff --git
a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/CounterCommand.java
b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/CounterCommand.java
index 843a158fb..905cc004b 100644
---
a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/CounterCommand.java
+++
b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/CounterCommand.java
@@ -18,6 +18,7 @@
package org.apache.ratis.examples.counter;
import org.apache.ratis.protocol.Message;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
/**
* The supported commands the Counter example.
@@ -38,4 +39,9 @@ public enum CounterCommand {
public boolean matches(String command) {
return name().equalsIgnoreCase(command);
}
+
+ /** Does the given command string match this command? */
+ public boolean matches(ByteString command) {
+ return message.getContent().equals(command);
+ }
}
diff --git
a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/client/CounterClient.java
b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/client/CounterClient.java
index 7076809f5..cb6c72725 100644
---
a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/client/CounterClient.java
+++
b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/client/CounterClient.java
@@ -22,6 +22,13 @@ import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.examples.common.Constants;
import org.apache.ratis.examples.counter.CounterCommand;
import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.util.ConcurrentUtils;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.Timestamp;
import java.io.Closeable;
import java.io.IOException;
@@ -29,9 +36,12 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
/**
* Counter client application, this application sends specific number of
@@ -42,81 +52,126 @@ import java.util.concurrent.Future;
* parameter found, application use default value which is 10
*/
public final class CounterClient implements Closeable {
+ enum Mode {
+ DRY_RUN, IO, ASYNC;
+
+ static Mode parse(String s) {
+ for(Mode m : values()) {
+ if (m.name().equalsIgnoreCase(s)) {
+ return m;
+ }
+ }
+ return DRY_RUN;
+ }
+ }
+
//build the client
- private final RaftClient client = RaftClient.newBuilder()
- .setProperties(new RaftProperties())
- .setRaftGroup(Constants.RAFT_GROUP)
- .build();
+ static RaftClient newClient() {
+ return RaftClient.newBuilder()
+ .setProperties(new RaftProperties())
+ .setRaftGroup(Constants.RAFT_GROUP)
+ .build();
+ }
+
+ private final RaftClient client = newClient();
@Override
public void close() throws IOException {
client.close();
}
- private void run(int increment, boolean blocking) throws Exception {
- System.out.printf("Sending %d %s command(s) using the %s ...%n",
- increment, CounterCommand.INCREMENT, blocking? "BlockingApi":
"AsyncApi");
- final List<Future<RaftClientReply>> futures = new ArrayList<>(increment);
+ static RaftClientReply assertReply(RaftClientReply reply) {
+ Preconditions.assertTrue(reply.isSuccess(), "Failed");
+ return reply;
+ }
+
+ static void send(int increment, Mode mode, RaftClient client) throws
Exception {
+ final List<CompletableFuture<RaftClientReply>> futures = new
ArrayList<>(increment);
//send INCREMENT command(s)
- if (blocking) {
+ if (mode == Mode.IO) {
// use BlockingApi
- final ExecutorService executor = Executors.newFixedThreadPool(10);
for (int i = 0; i < increment; i++) {
- final Future<RaftClientReply> f = executor.submit(
- () -> client.io().send(CounterCommand.INCREMENT.getMessage()));
- futures.add(f);
+ final RaftClientReply reply =
client.io().send(CounterCommand.INCREMENT.getMessage());
+ futures.add(CompletableFuture.completedFuture(reply));
}
- executor.shutdown();
- } else {
+ } else if (mode == Mode.ASYNC) {
// use AsyncApi
for (int i = 0; i < increment; i++) {
- final Future<RaftClientReply> f =
client.async().send(CounterCommand.INCREMENT.getMessage());
- futures.add(f);
+
futures.add(client.async().send(CounterCommand.INCREMENT.getMessage()).thenApply(CounterClient::assertReply));
}
+
+ //wait for the futures
+ JavaUtils.allOf(futures).get();
}
+ }
- //wait for the futures
- for (Future<RaftClientReply> f : futures) {
- final RaftClientReply reply = f.get();
- if (reply.isSuccess()) {
- final String count = reply.getMessage().getContent().toStringUtf8();
- System.out.println("Counter is incremented to " + count);
- } else {
- System.err.println("Failed " + reply);
- }
+ private void send(int i, int increment, Mode mode) {
+ System.out.println("Start client " + i);
+ try (RaftClient c = newClient()) {
+ send(increment, mode, c);
+ } catch (Exception e) {
+ throw new CompletionException(e);
+ }
+ }
+
+ private RaftClientReply readCounter(RaftPeerId server) {
+ try {
+ return client.io().sendReadOnly(CounterCommand.GET.getMessage(), server);
+ } catch (IOException e) {
+ System.err.println("Failed read-only request");
+ return RaftClientReply.newBuilder().setSuccess(false).build();
+ }
+ }
+
+ private void readComplete(RaftClientReply reply, Throwable t, RaftPeerId
server, Timestamp readStarted) {
+ if (t != null) {
+ System.err.println("Failed to get counter from " + server + ": " + t);
+ return;
+ } else if (reply == null || !reply.isSuccess()) {
+ System.err.println("Failed to get counter from " + server + " with reply
= " + reply);
+ return;
+ }
+
+ // reply is success
+ final TimeDuration readElapsed = readStarted.elapsedTime();
+ final int countValue =
reply.getMessage().getContent().asReadOnlyByteBuffer().getInt();
+ System.out.printf("read from %s and get counter value: %d, time elapsed:
%s.%n",
+ server, countValue, readElapsed.toString(TimeUnit.SECONDS, 3));
+ }
+
+ private void run(int increment, Mode mode, int numClients, ExecutorService
executor) throws Exception {
+ Preconditions.assertTrue(increment > 0, "increment <= 0");
+ Preconditions.assertTrue(numClients > 0, "numClients <= 0");
+ System.out.printf("Sending %d %s command(s) in %s mode with %d client(s)
...%n",
+ increment, CounterCommand.INCREMENT, mode, numClients);
+ final Timestamp sendStarted = Timestamp.currentTime();
+ ConcurrentUtils.parallelForEachAsync(numClients, i -> send(i, increment,
mode), executor).get();
+ final TimeDuration sendElapsed = sendStarted.elapsedTime();
+ final long numOp = numClients * (long)increment;
+
System.out.println("******************************************************");
+ System.out.printf("* Completed sending %d command(s) in %s%n",
+ numOp, sendElapsed.toString(TimeUnit.SECONDS, 3));
+ System.out.printf("* The rate is %01.2f op/s%n",
+ numOp * 1000.0 / sendElapsed.toLong(TimeUnit.MILLISECONDS));
+
System.out.println("******************************************************");
+
+ if (mode == Mode.DRY_RUN) {
+ return;
}
//send a GET command and print the reply
final RaftClientReply reply =
client.io().sendReadOnly(CounterCommand.GET.getMessage());
- final String count = reply.getMessage().getContent().toStringUtf8();
+ final int count =
reply.getMessage().getContent().asReadOnlyByteBuffer().getInt();
System.out.println("Current counter value: " + count);
// using Linearizable Read
- futures.clear();
- final long startTime = System.currentTimeMillis();
- final ExecutorService executor =
Executors.newFixedThreadPool(Constants.PEERS.size());
- Constants.PEERS.forEach(p -> {
- final Future<RaftClientReply> f = CompletableFuture.supplyAsync(() -> {
- try {
- return
client.io().sendReadOnly(CounterCommand.GET.getMessage(), p.getId());
- } catch (IOException e) {
- System.err.println("Failed read-only request");
- return
RaftClientReply.newBuilder().setSuccess(false).build();
- }
- }, executor).whenCompleteAsync((r, ex) -> {
- if (ex != null || !r.isSuccess()) {
- System.err.println("Failed " + r);
- return;
- }
- final long endTime = System.currentTimeMillis();
- final long elapsedSec = (endTime-startTime) / 1000;
- final String countValue =
r.getMessage().getContent().toStringUtf8();
- System.out.println("read from " + p.getId() + " and get
counter value: " + countValue
- + ", time elapsed: " + elapsedSec + " seconds");
- });
- futures.add(f);
- });
+ final Timestamp readStarted = Timestamp.currentTime();
+ final List<CompletableFuture<RaftClientReply>> futures =
Constants.PEERS.stream()
+ .map(RaftPeer::getId)
+ .map(server -> CompletableFuture.supplyAsync(() ->
readCounter(server), executor)
+ .whenComplete((r, t) -> readComplete(r, t, server, readStarted)))
+ .collect(Collectors.toList());
for (Future<RaftClientReply> f : futures) {
f.get();
@@ -127,18 +182,27 @@ public final class CounterClient implements Closeable {
try(CounterClient client = new CounterClient()) {
//the number of INCREMENT commands, default is 10
final int increment = args.length > 0 ? Integer.parseInt(args[0]) : 10;
- final boolean io = args.length > 1 && "io".equalsIgnoreCase(args[1]);
- client.run(increment, io);
+ final Mode mode = Mode.parse(args.length > 1? args[1] : null);
+ final int numClients = args.length > 2 ? Integer.parseInt(args[2]) : 1;
+
+ final ExecutorService executor =
Executors.newFixedThreadPool(Math.max(numClients, Constants.PEERS.size()));
+ try {
+ client.run(increment, mode, numClients, executor);
+ } finally {
+ executor.shutdown();
+ }
} catch (Throwable e) {
e.printStackTrace();
System.err.println();
System.err.println("args = " + Arrays.toString(args));
System.err.println();
- System.err.println("Usage: java
org.apache.ratis.examples.counter.client.CounterClient [increment] [async|io]");
+ System.err.printf("Usage: java %s [INCREMENT] [DRY_RUN|ASYNC|IO]
[CLIENTS]%n", CounterClient.class.getName());
System.err.println();
- System.err.println(" increment: the number of INCREMENT commands
to be sent (default is 10)");
- System.err.println(" async : use the AsyncApi (default)");
- System.err.println(" io : use the BlockingApi");
+ System.err.println(" INCREMENT: the number of INCREMENT commands
to be sent (default is 10)");
+ System.err.println(" DRY_RUN : dry run only (default)");
+ System.err.println(" ASYNC : use the AsyncApi");
+ System.err.println(" IO : use the BlockingApi");
+ System.err.println(" CLIENTS : the number of clients (default is
1)");
System.exit(1);
}
}
diff --git
a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterServer.java
b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterServer.java
index fab7b36c3..36dca5d6f 100644
---
a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterServer.java
+++
b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterServer.java
@@ -23,6 +23,7 @@ import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.util.NetUtils;
import org.apache.ratis.util.TimeDuration;
@@ -76,6 +77,7 @@ public final class CounterServer implements Closeable {
.setProperties(properties)
.setServerId(peer.getId())
.setStateMachine(counterStateMachine)
+ .setOption(RaftStorage.StartupOption.RECOVER)
.build();
}
diff --git
a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterStateMachine.java
b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterStateMachine.java
index 90f41ddad..b88a763e0 100644
---
a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterStateMachine.java
+++
b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterStateMachine.java
@@ -22,16 +22,18 @@ import org.apache.ratis.io.MD5Hash;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.protocol.TermIndex;
-import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.storage.FileInfo;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.statemachine.impl.BaseStateMachine;
import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.thirdparty.com.google.protobuf.UnsafeByteOperations;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.MD5FileUtil;
import org.apache.ratis.util.TimeDuration;
@@ -42,6 +44,7 @@ import java.io.File;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
+import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.concurrent.CompletableFuture;
@@ -83,10 +86,11 @@ public class CounterStateMachine extends BaseStateMachine {
private final TimeDuration simulatedSlowness;
public CounterStateMachine(TimeDuration simulatedSlowness) {
- this.simulatedSlowness = simulatedSlowness;
+ this.simulatedSlowness = simulatedSlowness.isPositive()?
simulatedSlowness: null;
}
- CounterStateMachine() {
- this.simulatedSlowness = TimeDuration.ZERO;
+
+ public CounterStateMachine() {
+ this(TimeDuration.ZERO);
}
/** @return the current state. */
@@ -100,13 +104,13 @@ public class CounterStateMachine extends BaseStateMachine
{
}
private synchronized int incrementCounter(TermIndex termIndex) {
- try {
- if (!simulatedSlowness.equals(TimeDuration.ZERO)) {
+ if (simulatedSlowness != null) {
+ try {
simulatedSlowness.sleep();
+ } catch (InterruptedException e) {
+ LOG.warn("{}: get interrupted in simulated slowness sleep before apply
transaction", this);
+ Thread.currentThread().interrupt();
}
- } catch (InterruptedException e) {
- LOG.warn("{}: get interrupted in simulated slowness sleep before apply
transaction", this);
- Thread.currentThread().interrupt();
}
updateLastAppliedTermIndex(termIndex);
return counter.incrementAndGet();
@@ -173,19 +177,18 @@ public class CounterStateMachine extends BaseStateMachine
{
* Load the state of the state machine from the {@link #storage}.
*
* @param snapshot the information of the snapshot being loaded
- * @return the index of the snapshot or -1 if snapshot is invalid
* @throws IOException if it failed to read from storage
*/
- private long load(SingleFileSnapshotInfo snapshot) throws IOException {
+ private void load(SingleFileSnapshotInfo snapshot) throws IOException {
//check null
if (snapshot == null) {
- return RaftLog.INVALID_LOG_INDEX;
+ return;
}
//check if the snapshot file exists.
final Path snapshotPath = snapshot.getFile().getPath();
if (!Files.exists(snapshotPath)) {
LOG.warn("The snapshot file {} does not exist for snapshot {}",
snapshotPath, snapshot);
- return RaftLog.INVALID_LOG_INDEX;
+ return;
}
// verify md5
@@ -205,8 +208,6 @@ public class CounterStateMachine extends BaseStateMachine {
//update state
updateState(last, counterValue);
-
- return last.getIndex();
}
/**
@@ -221,7 +222,21 @@ public class CounterStateMachine extends BaseStateMachine {
if (!CounterCommand.GET.matches(command)) {
return JavaUtils.completeExceptionally(new
IllegalArgumentException("Invalid Command: " + command));
}
- return
CompletableFuture.completedFuture(Message.valueOf(counter.toString()));
+ return
CompletableFuture.completedFuture(Message.valueOf(toByteString(counter.get())));
+ }
+
+ /**
+ * Validate the request and then build a {@link TransactionContext}.
+ */
+ @Override
+ public TransactionContext startTransaction(RaftClientRequest request) throws
IOException {
+ final TransactionContext transaction = super.startTransaction(request);
+ //check if the command is valid
+ final ByteString command = request.getMessage().getContent();
+ if (!CounterCommand.INCREMENT.matches(command)) {
+ transaction.setException(new IllegalArgumentException("Invalid Command:
" + command));
+ }
+ return transaction;
}
/**
@@ -233,22 +248,22 @@ public class CounterStateMachine extends BaseStateMachine
{
@Override
public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
final LogEntryProto entry = trx.getLogEntry();
-
- //check if the command is valid
- final String command =
entry.getStateMachineLogEntry().getLogData().toStringUtf8();
- if (!CounterCommand.INCREMENT.matches(command)) {
- return JavaUtils.completeExceptionally(new
IllegalArgumentException("Invalid Command: " + command));
- }
//increment the counter and update term-index
final TermIndex termIndex = TermIndex.valueOf(entry);
- final long incremented = incrementCounter(termIndex);
+ final int incremented = incrementCounter(termIndex);
//if leader, log the incremented value and the term-index
- if (trx.getServerRole() == RaftPeerRole.LEADER) {
- LOG.info("{}: Increment to {}", termIndex, incremented);
+ if (LOG.isDebugEnabled() && trx.getServerRole() == RaftPeerRole.LEADER) {
+ LOG.debug("{}: Increment to {}", termIndex, incremented);
}
//return the new value of the counter to the client
- return
CompletableFuture.completedFuture(Message.valueOf(String.valueOf(incremented)));
+ return
CompletableFuture.completedFuture(Message.valueOf(toByteString(incremented)));
+ }
+
+ static ByteString toByteString(int n) {
+ final byte[] array = new byte[4];
+ ByteBuffer.wrap(array).putInt(n);
+ return UnsafeByteOperations.unsafeWrap(array);
}
}
diff --git
a/ratis-examples/src/main/java/org/apache/ratis/examples/membership/server/CServer.java
b/ratis-examples/src/main/java/org/apache/ratis/examples/membership/server/CServer.java
index 69cc4d58d..a846cd1e7 100644
---
a/ratis-examples/src/main/java/org/apache/ratis/examples/membership/server/CServer.java
+++
b/ratis-examples/src/main/java/org/apache/ratis/examples/membership/server/CServer.java
@@ -31,7 +31,6 @@ import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.thirdparty.com.google.common.base.MoreObjects;
import org.apache.ratis.util.FileUtils;
-import org.apache.ratis.util.TimeDuration;
import java.io.Closeable;
import java.io.File;
@@ -59,7 +58,7 @@ public class CServer implements Closeable {
NettyConfigKeys.Server.setPort(properties, port);
// create the counter state machine which holds the counter value.
- final CounterStateMachine counterStateMachine = new
CounterStateMachine(TimeDuration.ZERO);
+ final CounterStateMachine counterStateMachine = new CounterStateMachine();
// build the Raft server.
this.server = RaftServer.newBuilder()
diff --git
a/ratis-examples/src/test/java/org/apache/ratis/examples/counter/TestCounter.java
b/ratis-examples/src/test/java/org/apache/ratis/examples/counter/TestCounter.java
index 953fd5bfa..6fbe8770e 100644
---
a/ratis-examples/src/test/java/org/apache/ratis/examples/counter/TestCounter.java
+++
b/ratis-examples/src/test/java/org/apache/ratis/examples/counter/TestCounter.java
@@ -49,17 +49,17 @@ public class TestCounter extends ParameterizedBaseTest {
client.io().send(Message.valueOf("INCREMENT"));
}
RaftClientReply reply1 =
client.io().sendReadOnly(Message.valueOf("GET"));
- Assert.assertEquals("10",
reply1.getMessage().getContent().toStringUtf8());
+ Assert.assertEquals(10,
reply1.getMessage().getContent().asReadOnlyByteBuffer().getInt());
for (int i = 0; i < 10; i++) {
client.io().send(Message.valueOf("INCREMENT"));
}
RaftClientReply reply2 =
client.io().sendReadOnly(Message.valueOf("GET"));
- Assert.assertEquals("20",
reply2.getMessage().getContent().toStringUtf8());
+ Assert.assertEquals(20,
reply2.getMessage().getContent().asReadOnlyByteBuffer().getInt());
for (int i = 0; i < 10; i++) {
client.io().send(Message.valueOf("INCREMENT"));
}
RaftClientReply reply3 =
client.io().sendReadOnly(Message.valueOf("GET"));
- Assert.assertEquals("30",
reply3.getMessage().getContent().toStringUtf8());
+ Assert.assertEquals(30,
reply3.getMessage().getContent().asReadOnlyByteBuffer().getInt());
}
}
}