This is an automated email from the ASF dual-hosted git repository. dragonyliu pushed a commit to branch branch-2 in repository https://gitbox.apache.org/repos/asf/ratis.git
commit 07f04cbd299e653cbad4bd1589b2b46aa1427bf7 Author: Tsz-Wo Nicholas Sze <[email protected]> AuthorDate: Mon Aug 8 18:57:47 2022 -0700 RATIS-1641. Cleanup the Counter example and revise the "Getting Started" doc. (#698) * RATIS-1641. Cleanup the Counter example and revise the "Getting Started" doc. * Revert ArithmeticStateMachine.java * Add an option to use AsyncApi * Fix a long line * Minor changes in CounterClient. * Remove the use of Charset. * Revert some CounterServer change for a findbugs warning. * Fix a bug in CounterServer.main. (cherry picked from commit 1009a6c385373b3280cc9066e02ce050911627f8) --- ratis-docs/src/site/markdown/start/index.md | 653 +++++++++++++-------- .../apache/ratis/examples/common/Constants.java | 5 +- .../ratis/examples/counter/CounterCommand.java | 41 ++ .../examples/counter/client/CounterClient.java | 118 ++-- .../examples/counter/server/CounterServer.java | 59 +- .../counter/server/CounterStateMachine.java | 205 ++++--- .../apache/ratis/examples/counter/TestCounter.java | 10 +- .../ratis/statemachine/impl/BaseStateMachine.java | 9 +- 8 files changed, 679 insertions(+), 421 deletions(-) diff --git a/ratis-docs/src/site/markdown/start/index.md b/ratis-docs/src/site/markdown/start/index.md index 7aca21f9b..beaa519ea 100644 --- a/ratis-docs/src/site/markdown/start/index.md +++ b/ratis-docs/src/site/markdown/start/index.md @@ -18,23 +18,51 @@ # Getting Started Let's get started to use Raft in your application. To demonstrate how to use Ratis, -we'll implement a simple counter service, -which maintains a counter value across a cluster. -The client could send the following types of commands to the cluster: +we implement a simple *Counter* service, +which maintains a counter value across a raft group. +Clients could send the following types of requests to the raft group: -* `INCREMENT`: increase the counter value -* `GET`: query the current value of the counter, -we call such kind of commands as read-only commands +* `INCREMENT`: increase the counter value by 1. +This command will trigger a transaction to change the state. +* `GET`: query the current value of the counter. +This is a read-only command since it does not change the state. -Note: The full source could be found at [Ratis examples](https://github.com/apache/ratis/tree/master/ratis-examples). -This article is mainly intended to show the steps of integration of Ratis, -if you wish to run this example or find more examples, +We have the following `enum` for representing the supported commands. + +```java +/** + * The supported commands the Counter example. + */ +public enum CounterCommand { + /** Increment the counter by 1. */ + INCREMENT, + /** Get the counter value. */ + GET; + + private final Message message = Message.valueOf(name()); + + public Message getMessage() { + return message; + } + + /** Does the given command string match this command? */ + public boolean matches(String command) { + return name().equalsIgnoreCase(command); + } +} +``` + +Note: +The source code of the Counter example and the other examples is at +[Ratis examples](https://github.com/apache/ratis/tree/master/ratis-examples). +This article intends to show the steps of integration of Ratis. +If you wish to run the Counter example please refer to [the README](https://github.com/apache/ratis/tree/master/ratis-examples#example-3-counter). -## Add the dependency +## Adding the Dependency -First, we need to add Ratis dependencies into the project, -it's available in maven central: +The first step is to add Ratis dependencies into the project. +The dependencies are available in maven central: ```xml <dependency> @@ -43,13 +71,14 @@ it's available in maven central: </dependency> ``` -Also, one of the following transports need to be added: +Then, add one of the following transports: -* grpc -* netty -* hadoop +* ratis-grpc +* ratis-netty +* ratis-hadoop -For example, let's use grpc transport: +In this example, +we choose to use ratis-grpc: ```xml <dependency> @@ -61,294 +90,436 @@ For example, let's use grpc transport: Please note that Apache Hadoop dependencies are shaded, so it’s safe to use hadoop transport with different versions of Hadoop. -## Create the StateMachine -A state machine is used to maintain the current state of the raft node, -the state machine is responsible for: - -* Execute raft logs to get the state. In this example, when a `INCREMENT` log is executed, -the counter value will be increased by 1. -And a `GET` log does not affect the state but only returns the current counter value to the client. -* Managing snapshots loading/saving. -Snapshots are used to speed the log execution, -the state machine could start from a snapshot point and only execute newer logs. - -### Define the StateMachine -To define our state machine, -we can extend a class from the base class `BaseStateMachine`. - -Also, a storage is needed to store snapshots, -and we'll use the build-in `SimpleStateMachineStorage`, -which is a file-based storage implementation. - -Since we're going to implement a count server, -the `counter` instance is defined in the state machine, -represents the current value. -Below is the declaration of the state machine: +## Implementing the `CounterStateMachine` +A state machine manages the application logic. +The state machine is responsible for: + +* Apply raft log transactions in order to maintain the current state. + * When there is an `INCREMENT` request, + it will first be written to the raft log as a log entry. + Once the log entry is committed, + the state machine will be invoked for applying the log entry as a transaction + so that the counter value will be increased by 1. + * When there is a `GET` request, + it will not be written to the raft log + since it is a readonly request which does not change the state. + The state machine should return the current value of the counter. +* Manage snapshots loading/saving. + * Snapshots are used for log compaction + so that the state machine can be restored from a snapshot + and then applies only the newer log entries, + instead of applying a long history of log starting from the beginning. + +We discuss how to implement `CounterStateMachine` in the following subsections. +The complete source code of it is in +[CounterStateMachine.java](https://github.com/apache/ratis/blob/master/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterStateMachine.java). + +### Defining the State +In this example, +the `CounterStateMachine` extends the `BaseStateMachine`, +which provides a base implementation of a `StateMachine`. + +Inside the `CounterStateMachine`, +there is a `counter` object +which stores the current value. +The `counter` is an `AtomicInteger` +in order to support concurrent access. +We use the build-in `SimpleStateMachineStorage`, +which is a file-based storage implementation, +as a storage for storing snapshots. +The fields are shown below: ```java public class CounterStateMachine extends BaseStateMachine { - private final SimpleStateMachineStorage storage = - new SimpleStateMachineStorage(); - private AtomicInteger counter = new AtomicInteger(0); - // ... + // ... + + private final SimpleStateMachineStorage storage = new SimpleStateMachineStorage(); + private final AtomicInteger counter = new AtomicInteger(0); + + // ... } ``` -### Apply Raft Log Item +### Applying Raft Log Entries -Once the raft log is committed, -Ratis will notify state machine by invoking the `public CompletableFuture<Message> applyTransaction(TransactionContext trx)` method, -and we need to override this method to decode the message and apply it. - -First, get the log content and decode it: +Once a raft log entry has been committed +(i.e. a majority of the servers have acknowledged), +Ratis notifies the state machine by invoking the `applyTransaction` method. +The `applyTransaction` method first validates the log entry. +Then, it applies the log entry by increasing the counter value and updates the term-index. +The code fragments are shown below. +Note that the `incrementCounter` method is synchronized +in order to update both counter and last applied term-index atomically. ```java public class CounterStateMachine extends BaseStateMachine { - // ... - public CompletableFuture<Message> applyTransaction(TransactionContext trx) { - final RaftProtos.LogEntryProto entry = trx.getLogEntry(); - String logData = entry.getStateMachineLogEntry().getLogData() - .toString(Charset.defaultCharset()); - if (!logData.equals("INCREMENT")) { - return CompletableFuture.completedFuture( - Message.valueOf("Invalid Command")); - } - // ... + // ... + + private synchronized int incrementCounter(TermIndex termIndex) { + updateLastAppliedTermIndex(termIndex); + return counter.incrementAndGet(); + } + + // ... + + /** + * Apply the {@link CounterCommand#INCREMENT} by incrementing the counter object. + * + * @param trx the transaction context + * @return the message containing the updated counter value + */ + @Override + public CompletableFuture<Message> applyTransaction(TransactionContext trx) { + final LogEntryProto entry = trx.getLogEntry(); + + //check if the command is valid + final String command = entry.getStateMachineLogEntry().getLogData().toString(Charset.defaultCharset()); + if (!CounterCommand.INCREMENT.match(command)) { + return JavaUtils.completeExceptionally(new IllegalArgumentException("Invalid Command: " + command)); } + //increment the counter and update term-index + final TermIndex termIndex = TermIndex.valueOf(entry); + final long incremented = incrementCounter(termIndex); + + //if leader, log the incremented value and the term-index + if (trx.getServerRole() == RaftPeerRole.LEADER) { + LOG.info("{}: Increment to {}", termIndex, incremented); + } + + //return the new value of the counter to the client + return CompletableFuture.completedFuture(Message.valueOf(String.valueOf(incremented))); + } + + // ... } ``` -After that, if the log is valid, -we could apply it by increasing the counter value. -Remember that we also need to update the committed indexes: +### Processing Readonly Commands +The `INCREMENT` command is implemented in the previous section. +What about the `GET` command? +Since the `GET` command is a readonly command, +it is implemented by the `query` method instead of the `applyTransaction` method. +The code fragment is shown below. ```java public class CounterStateMachine extends BaseStateMachine { - // ... - public CompletableFuture<Message> applyTransaction(TransactionContext trx) { - // ... - final long index = entry.getIndex(); - updateLastAppliedTermIndex(entry.getTerm(), index); - - //actual execution of the command: increment the counter - counter.incrementAndGet(); - - //return the new value of the counter to the client - final CompletableFuture<Message> f = - CompletableFuture.completedFuture(Message.valueOf(counter.toString())); - return f; + // ... + + /** + * Process {@link CounterCommand#GET}, which gets the counter value. + * + * @param request the GET request + * @return a {@link Message} containing the current counter value as a {@link String}. + */ + @Override + public CompletableFuture<Message> query(Message request) { + final String command = request.getContent().toString(Charset.defaultCharset()); + if (!CounterCommand.GET.match(command)) { + return JavaUtils.completeExceptionally(new IllegalArgumentException("Invalid Command: " + command)); } + return CompletableFuture.completedFuture(Message.valueOf(counter.toString())); + } + + // ... } ``` -### Handle Readonly Command -Note that we only handled `INCREMENT` command, -what about the `GET` command? -The `GET` command is implemented as a readonly command, -so we'll need to implement `public CompletableFuture<Message> query(Message request)` instead of `applyTransaction`. +### Taking Snapshots +When taking a snapshot, +the state is persisted in the storage of the state machine. +The snapshot can be loaded for restoring the state in the future. +In this example, +we use `ObjectOutputStream` to write the counter value to a snapshot file. +The term-index is stored in the file name of the snapshot file. +The code fragments are shown below. +Note that the `getState` method is synchronized +in order to get the applied term-index and the counter value atomically. +Note also that getting the counter value alone does not have to be synchronized +since the `counter` field is already an `AtomicInteger`. ```java public class CounterStateMachine extends BaseStateMachine { - // ... - @Override - public CompletableFuture<Message> query(Message request) { - String msg = request.getContent().toString(Charset.defaultCharset()); - if (!msg.equals("GET")) { - return CompletableFuture.completedFuture( - Message.valueOf("Invalid Command")); - } - return CompletableFuture.completedFuture( - Message.valueOf(counter.toString())); + // ... + + /** The state of the {@link CounterStateMachine}. */ + static class CounterState { + private final TermIndex applied; + private final int counter; + + CounterState(TermIndex applied, int counter) { + this.applied = applied; + this.counter = counter; + } + + TermIndex getApplied() { + return applied; + } + + int getCounter() { + return counter; + } + } + + // ... + + /** @return the current state. */ + private synchronized CounterState getState() { + return new CounterState(getLastAppliedTermIndex(), counter.get()); + } + + // ... + + /** + * Store the current state as a snapshot file in the {@link #storage}. + * + * @return the index of the snapshot + */ + @Override + public long takeSnapshot() { + //get the current state + final CounterState state = getState(); + final long index = state.getApplied().getIndex(); + + //create a file with a proper name to store the snapshot + final File snapshotFile = storage.getSnapshotFile(state.getApplied().getTerm(), index); + + //write the counter value into the snapshot file + try (ObjectOutputStream out = new ObjectOutputStream(new BufferedOutputStream( + Files.newOutputStream(snapshotFile.toPath())))) { + out.writeInt(state.getCounter()); + } catch (IOException ioe) { + LOG.warn("Failed to write snapshot file \"" + snapshotFile + + "\", last applied index=" + state.getApplied()); } + + //return the index of the stored snapshot (which is the last applied one) + return index; + } + + // ... } ``` -### Save and Load Snapshots -When taking a snapshot, -we persist every state in the state machine, -and the value could be loaded directly to the state in the future. -In this example, -the only state is the counter value, -we're going to use `ObjectOutputStream` to write it to a snapshot file: +### Loading Snapshots +When loading a snapshot, +we use an `ObjectInputStream` to read the snapshot file. +The term-index is read from the file name of the snapshot file. +The code fragments are shown below. +Note that the `updateState` method is synchronized +in order to update the applied term-index and the counter value atomically. ```java public class CounterStateMachine extends BaseStateMachine { - // ... - @Override - public long takeSnapshot() { - //get the last applied index - final TermIndex last = getLastAppliedTermIndex(); - - //create a file with a proper name to store the snapshot - final File snapshotFile = - storage.getSnapshotFile(last.getTerm(), last.getIndex()); - - //serialize the counter object and write it into the snapshot file - try (ObjectOutputStream out = new ObjectOutputStream( - new BufferedOutputStream(new FileOutputStream(snapshotFile)))) { - out.writeObject(counter); - } catch (IOException ioe) { - LOG.warn("Failed to write snapshot file \"" + snapshotFile - + "\", last applied index=" + last); - } - - //return the index of the stored snapshot (which is the last applied one) - return last.getIndex(); + // ... + + private synchronized void updateState(TermIndex applied, int counterValue) { + updateLastAppliedTermIndex(applied); + counter.set(counterValue); + } + + // ... + + /** + * Load the state of the state machine from the {@link #storage}. + * + * @param snapshot the information of the snapshot being loaded + * @return the index of the snapshot or -1 if snapshot is invalid + * @throws IOException if it failed to read from storage + */ + private long load(SingleFileSnapshotInfo snapshot) throws IOException { + //check null + if (snapshot == null) { + LOG.warn("The snapshot info is null."); + return RaftLog.INVALID_LOG_INDEX; + } + //check if the snapshot file exists. + final Path snapshotPath = snapshot.getFile().getPath(); + if (!Files.exists(snapshotPath)) { + LOG.warn("The snapshot file {} does not exist for snapshot {}", snapshotPath, snapshot); + return RaftLog.INVALID_LOG_INDEX; } + + //read the TermIndex from the snapshot file name + final TermIndex last = SimpleStateMachineStorage.getTermIndexFromSnapshotFile(snapshotPath.toFile()); + + //read the counter value from the snapshot file + final int counterValue; + try (ObjectInputStream in = new ObjectInputStream(new BufferedInputStream(Files.newInputStream(snapshotPath)))) { + counterValue = in.readInt(); + } + + //update state + updateState(last, counterValue); + + return last.getIndex(); + } + + // ... } ``` -When loading it, -we could use `ObjectInputStream` to deserialize it. -Remember that we also need to implement `initialize` and `reinitialize` method, -so that the state machine will be correctly initialized. - -## Build and Start a RaftServer -In order to build a raft cluster, -each node must start a `RaftServer` instance, -which is responsible for communicating to each other through Raft protocol. +### Implementing the `initialize` and `reinitialize` methods. +The `initialize` method is called at most once +when the server is starting up. +In contrast, +the `reinitialize` method is called when +1. the server is resumed from the `PAUSE` state, or +2. a new snapshot is installed from the leader or from an external source. + +In `CounterStateMachine`, +the `reinitialize` method simply loads the latest snapshot +and the `initialize` method additionally initializes the `BaseStateMachine` super class and the storage. +```java +public class CounterStateMachine extends BaseStateMachine { + // ... + + /** + * Initialize the state machine storage and then load the state. + * + * @param server the server running this state machine + * @param groupId the id of the {@link org.apache.ratis.protocol.RaftGroup} + * @param raftStorage the storage of the server + * @throws IOException if it fails to load the state. + */ + @Override + public void initialize(RaftServer server, RaftGroupId groupId, RaftStorage raftStorage) throws IOException { + super.initialize(server, groupId, raftStorage); + storage.init(raftStorage); + reinitialize(); + } + + /** + * Simply load the state. + * + * @throws IOException if it fails to load the state. + */ + @Override + public void reinitialize() throws IOException { + load(storage.getLatestSnapshot()); + } + + // ... +} +``` +## Preparing a `RaftGroup` +In order to run a raft group, +each server must start a `RaftServer` instance, +which is responsible for communicating to each other through the Raft protocol. It's important to keep in mind that, -each raft server knows exactly how many raft peers are in the cluster, -and what are the addresses of them. -In this example, we'll set a 3 node cluster. +each raft server knows the initial raft group when starting up. +They know the number of raft peers in the group +and the addresses of the peers. + +In this example, we have a raft group with 3 peers. For simplicity, -each peer listens to specific port on the same machine, -and we can define the addresses of the cluster in a configuration file: +each peer listens to a specific port on the same machine. +The addresses of them are defined in a +[property file](https://github.com/apache/ratis/blob/master/ratis-examples/src/main/resources/conf.properties) +as below. ```properties raft.server.address.list=127.0.0.1:10024,127.0.0.1:10124,127.0.0.1:11124 ``` -We name those peers as 'n-0', 'n-1' and 'n-2', -and then we will create a `RaftGroup` instance representing them. -Since they are immutable, -we'll put them in the `Constant` class: +The peers are named as 'n0', 'n1' and 'n2' +and they form a `RaftGroup`. +For more details, see +[Constants.java](https://github.com/apache/ratis/blob/master/ratis-examples/src/main/java/org/apache/ratis/examples/common/Constants.java). -```java -public final class Constants { - public static final List<RaftPeer> PEERS; - private static final UUID CLUSTER_GROUP_ID = UUID.fromString("02511d47-d67c-49a3-9011-abb3109a44c1"); - - static { - // load addresses from configuration file - // final String[] addresses = ... - List<RaftPeer> peers = new ArrayList<>(addresses.length); - for (int i = 0; i < addresses.length; i++) { - peers.add(RaftPeer.newBuilder().setId("n" + i).setAddress(addresses[i]).build()); - } - PEERS = Collections.unmodifiableList(peers); - } - - public static final RaftGroup RAFT_GROUP = RaftGroup.valueOf( - RaftGroupId.valueOf(Constants.CLUSTER_GROUP_ID), PEERS); - // ... -} -``` +## Building & Starting the `CounterServer` -Except for the cluster info, -another important thing is that we need to know the information of the current peer. -To achieve this, -we could pass the current peer's id as a program argument, -and then the raft server could be created: +We use a `RaftServer.Builder` to build a `RaftServer`. +We first set up a `RaftProperties` object +with a local directory as the storage of the server +and a port number as the gRPC server port. +Then, +we create our `CounterStateMachine` +and pass everything to the builder as below. ```java public final class CounterServer implements Closeable { - private final RaftServer server; - - // the current peer will be passed as argument - public CounterServer(RaftPeer peer, File storageDir) throws IOException { - // ... - CounterStateMachine counterStateMachine = new CounterStateMachine(); - - //create and start the Raft server - this.server = RaftServer.newBuilder() - .setGroup(Constants.RAFT_GROUP) - .setProperties(properties) - .setServerId(peer.getId()) - .setStateMachine(counterStateMachine) - .build(); - } + private final RaftServer server; - public void start() throws IOException { - server.start(); - } -} -``` + public CounterServer(RaftPeer peer, File storageDir) throws IOException { + //create a property object + final RaftProperties properties = new RaftProperties(); -Each `RaftServer` will own a `CounterStateMachine` instance, -as previously defined by us. -After that, all we need to do is to start it along with our application: + //set the storage directory (different for each peer) in the RaftProperty object + RaftServerConfigKeys.setStorageDir(properties, Collections.singletonList(storageDir)); -```java -public final class CounterServer implements Closeable { - // ... - public static void main(String[] args) throws IOException { - // ... - //find current peer object based on application parameter - final RaftPeer currentPeer = Constants.PEERS.get(Integer.parseInt(args[0]) - 1); - - //start a counter server - final File storageDir = new File("./" + currentPeer.getId()); - final CounterServer counterServer = new CounterServer(currentPeer, storageDir); - counterServer.start(); - // ... - } + //set the port (different for each peer) in RaftProperty object + final int port = NetUtils.createSocketAddr(peer.getAddress()).getPort(); + GrpcConfigKeys.Server.setPort(properties, port); + + //create the counter state machine which holds the counter value + final CounterStateMachine counterStateMachine = new CounterStateMachine(); + + //build the Raft server + this.server = RaftServer.newBuilder() + .setGroup(Constants.RAFT_GROUP) + .setProperties(properties) + .setServerId(peer.getId()) + .setStateMachine(counterStateMachine) + .build(); + } + // ... } ``` -After the server is started, -it will try to communicate with other peers in the cluster, -and perform raft actions like leader election, append log entries, etc. +Now we are ready to start our `CounterServer` peers and form a raft group. +The command is: +```shell +java org.apache.ratis.examples.counter.server.CounterServer peer_index +``` +The argument `peer_index` must be 0, 1 or 2. + +After a server is started, +it communicates with other peers in the group, +and performs raft actions such as leader election and append-log-entries. +After all three servers are started, +the counter service is up and running with the Raft protocol. -## Build Raft Client +For more details, see +[CounterServer.java](https://github.com/apache/ratis/blob/master/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterServer.java). -To send commands to the cluster, -we need to use a `RaftClient` instance. -All we need to know is the peers in the cluster, ie. the raft group. +## Building & Running the `CounterClient` + +We use a `RaftGroup` to build a `RaftClient` +and then use the `RaftClient` to send commands to the raft service. +Note that gRPC is the default RPC type +so that we may skip setting it in the `RaftProperties`. ```java -public final class CounterClient { - // ... - private static RaftClient buildClient() { - RaftProperties raftProperties = new RaftProperties(); - RaftClient.Builder builder = RaftClient.newBuilder() - .setProperties(raftProperties) - .setRaftGroup(Constants.RAFT_GROUP) - .setClientRpc( - new GrpcFactory(new Parameters()) - .newRaftClientRpc(ClientId.randomId(), raftProperties)); - return builder.build(); - } +public final class CounterClient implements Closeable { + private final RaftClient client = RaftClient.newBuilder() + .setProperties(new RaftProperties()) + .setRaftGroup(Constants.RAFT_GROUP) + .build(); + + // ... } ``` With this raft client, -we can then send commands by `raftClient.io().send` method, -and use `raftClient.io().sendReadonly` method for read only commands. -In this example, -to send `INCREMENT` and `GET` command, -we can do it like this: - +we can then send commands using the `BlockingApi` returned by `RaftClient.io()`, +or the `AsyncApi` returned by `RaftClient.async()`. +The `send` method in the `BlockingApi`/`AsyncApi` is used to send the `INCREMENT` command as below. ```java -raftClient.io().send(Message.valueOf("INCREMENT"))); +client.io().send(CounterCommand.INCREMENT.getMessage()); ``` - -and - +or ```java -RaftClientReply count = raftClient.io().sendReadOnly(Message.valueOf("GET")); -String response = count.getMessage().getContent().toString(Charset.defaultCharset()); -System.out.println(response); +client.async().send(CounterCommand.INCREMENT.getMessage()); ``` - -## Summary -It might seem a little complicated for beginners, -but since Raft itself is a hard topic, -this is already the simplest example we've found as a 'Hello World' for Ratis. -After you have a basic understanding of Ratis, -you'll find it really easy to be integrated into any projects. - -Next, you can take a look at other [examples](https://github.com/apache/ratis/tree/master/ratis-examples), -to know more about the features of Ratis. \ No newline at end of file +The `sendReadonly` method in the `BlockingApi`/`AsyncApi` is used to send the `GET` command as below. +```java +client.io().sendReadOnly(CounterCommand.GET.getMessage()); +``` +or +```java +client.async().sendReadOnly(CounterCommand.GET.getMessage()); +``` +For more details, see +[CounterClient.java](https://github.com/apache/ratis/blob/master/ratis-examples/src/main/java/org/apache/ratis/examples/counter/client/CounterClient.java). \ No newline at end of file diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/common/Constants.java b/ratis-examples/src/main/java/org/apache/ratis/examples/common/Constants.java index 9da64092d..9fa873fc2 100644 --- a/ratis-examples/src/main/java/org/apache/ratis/examples/common/Constants.java +++ b/ratis-examples/src/main/java/org/apache/ratis/examples/common/Constants.java @@ -71,10 +71,9 @@ public final class Constants { PEERS = Collections.unmodifiableList(peers); } - private static final UUID CLUSTER_GROUP_ID = UUID.fromString("02511d47-d67c-49a3-9011-abb3109a44c1"); + private static final UUID GROUP_ID = UUID.fromString("02511d47-d67c-49a3-9011-abb3109a44c1"); - public static final RaftGroup RAFT_GROUP = RaftGroup.valueOf( - RaftGroupId.valueOf(Constants.CLUSTER_GROUP_ID), PEERS); + public static final RaftGroup RAFT_GROUP = RaftGroup.valueOf(RaftGroupId.valueOf(Constants.GROUP_ID), PEERS); private Constants() { } diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/CounterCommand.java b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/CounterCommand.java new file mode 100644 index 000000000..843a158fb --- /dev/null +++ b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/CounterCommand.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.examples.counter; + +import org.apache.ratis.protocol.Message; + +/** + * The supported commands the Counter example. + */ +public enum CounterCommand { + /** Increment the counter by 1. */ + INCREMENT, + /** Get the counter value. */ + GET; + + private final Message message = Message.valueOf(name()); + + public Message getMessage() { + return message; + } + + /** Does the given command string match this command? */ + public boolean matches(String command) { + return name().equalsIgnoreCase(command); + } +} diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/client/CounterClient.java b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/client/CounterClient.java index a3f28cb6a..c0350ec72 100644 --- a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/client/CounterClient.java +++ b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/client/CounterClient.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,24 +15,22 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.ratis.examples.counter.client; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import org.apache.ratis.client.RaftClient; -import org.apache.ratis.conf.Parameters; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.examples.common.Constants; -import org.apache.ratis.grpc.GrpcFactory; -import org.apache.ratis.protocol.ClientId; -import org.apache.ratis.protocol.Message; +import org.apache.ratis.examples.counter.CounterCommand; import org.apache.ratis.protocol.RaftClientReply; +import java.io.Closeable; import java.io.IOException; -import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.Future; /** * Counter client application, this application sends specific number of @@ -42,55 +40,75 @@ import java.util.concurrent.TimeUnit; * Parameter to this application indicate the number of INCREMENT command, if no * parameter found, application use default value which is 10 */ -public final class CounterClient { +public final class CounterClient implements Closeable { + //build the client + private final RaftClient client = RaftClient.newBuilder() + .setProperties(new RaftProperties()) + .setRaftGroup(Constants.RAFT_GROUP) + .build(); - private CounterClient(){ + @Override + public void close() throws IOException { + client.close(); } - @SuppressFBWarnings("RV_RETURN_VALUE_IGNORED_BAD_PRACTICE") - public static void main(String[] args) - throws IOException, InterruptedException { - //indicate the number of INCREMENT command, set 10 if no parameter passed - int increment = args.length > 0 ? Integer.parseInt(args[0]) : 10; - - //build the counter cluster client - RaftClient raftClient = buildClient(); + private void run(int increment, boolean blocking) throws Exception { + System.out.printf("Sending %d %s command(s) using the %s ...%n", + increment, CounterCommand.INCREMENT, blocking? "BlockingApi": "AsyncApi"); + final List<Future<RaftClientReply>> futures = new ArrayList<>(increment); - //use a executor service with 10 thread to send INCREMENT commands - // concurrently - ExecutorService executorService = Executors.newFixedThreadPool(10); - - //send INCREMENT commands concurrently - System.out.printf("Sending %d increment command...%n", increment); - for (int i = 0; i < increment; i++) { - executorService.submit(() -> - raftClient.io().send(Message.valueOf("INCREMENT"))); + //send INCREMENT command(s) + if (blocking) { + // use BlockingApi + final ExecutorService executor = Executors.newFixedThreadPool(10); + for (int i = 0; i < increment; i++) { + final Future<RaftClientReply> f = executor.submit( + () -> client.io().send(CounterCommand.INCREMENT.getMessage())); + futures.add(f); + } + executor.shutdown(); + } else { + // use AsyncApi + for (int i = 0; i < increment; i++) { + final Future<RaftClientReply> f = client.async().send(CounterCommand.INCREMENT.getMessage()); + futures.add(f); + } } - //shutdown the executor service and wait until they finish their work - executorService.shutdown(); - executorService.awaitTermination(increment * 500L, TimeUnit.MILLISECONDS); + //wait for the futures + for (Future<RaftClientReply> f : futures) { + final RaftClientReply reply = f.get(); + if (reply.isSuccess()) { + final String count = reply.getMessage().getContent().toStringUtf8(); + System.out.println("Counter is incremented to " + count); + } else { + System.err.println("Failed " + reply); + } + } - //send GET command and print the response - RaftClientReply count = raftClient.io().sendReadOnly(Message.valueOf("GET")); - String response = count.getMessage().getContent().toString(Charset.defaultCharset()); - System.out.println(response); + //send a GET command and print the reply + final RaftClientReply reply = client.io().sendReadOnly(CounterCommand.GET.getMessage()); + final String count = reply.getMessage().getContent().toStringUtf8(); + System.out.println("Current counter value: " + count); } - /** - * build the RaftClient instance which is used to communicate to - * Counter cluster - * - * @return the created client of Counter cluster - */ - private static RaftClient buildClient() { - RaftProperties raftProperties = new RaftProperties(); - RaftClient.Builder builder = RaftClient.newBuilder() - .setProperties(raftProperties) - .setRaftGroup(Constants.RAFT_GROUP) - .setClientRpc( - new GrpcFactory(new Parameters()) - .newRaftClientRpc(ClientId.randomId(), raftProperties)); - return builder.build(); + public static void main(String[] args) { + try(CounterClient client = new CounterClient()) { + //the number of INCREMENT commands, default is 10 + final int increment = args.length > 0 ? Integer.parseInt(args[0]) : 10; + final boolean io = args.length > 1 && "io".equalsIgnoreCase(args[1]); + client.run(increment, io); + } catch (Throwable e) { + e.printStackTrace(); + System.err.println(); + System.err.println("args = " + Arrays.toString(args)); + System.err.println(); + System.err.println("Usage: java org.apache.ratis.examples.counter.client.CounterClient [increment] [async|io]"); + System.err.println(); + System.err.println(" increment: the number of INCREMENT commands to be sent (default is 10)"); + System.err.println(" async : use the AsyncApi (default)"); + System.err.println(" io : use the BlockingApi"); + System.exit(1); + } } } diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterServer.java b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterServer.java index b6fc8c7d8..7a6367ece 100644 --- a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterServer.java +++ b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterServer.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.ratis.examples.counter.server; import org.apache.ratis.conf.RaftProperties; @@ -29,6 +28,7 @@ import org.apache.ratis.util.NetUtils; import java.io.Closeable; import java.io.File; import java.io.IOException; +import java.util.Arrays; import java.util.Collections; import java.util.Scanner; @@ -48,19 +48,19 @@ public final class CounterServer implements Closeable { public CounterServer(RaftPeer peer, File storageDir) throws IOException { //create a property object - RaftProperties properties = new RaftProperties(); + final RaftProperties properties = new RaftProperties(); - //set the storage directory (different for each peer) in RaftProperty object + //set the storage directory (different for each peer) in the RaftProperty object RaftServerConfigKeys.setStorageDir(properties, Collections.singletonList(storageDir)); - //set the port which server listen to in RaftProperty object + //set the port (different for each peer) in RaftProperty object final int port = NetUtils.createSocketAddr(peer.getAddress()).getPort(); GrpcConfigKeys.Server.setPort(properties, port); - //create the counter state machine which hold the counter value - CounterStateMachine counterStateMachine = new CounterStateMachine(); + //create the counter state machine which holds the counter value + final CounterStateMachine counterStateMachine = new CounterStateMachine(); - //create and start the Raft server + //build the Raft server this.server = RaftServer.newBuilder() .setGroup(Constants.RAFT_GROUP) .setProperties(properties) @@ -78,24 +78,41 @@ public final class CounterServer implements Closeable { server.close(); } - public static void main(String[] args) throws IOException { - if (args.length < 1) { - System.err.println("Usage: java -cp *.jar org.apache.ratis.examples.counter.server.CounterServer {serverIndex}"); - System.err.println("{serverIndex} could be 1, 2 or 3"); + public static void main(String[] args) { + try { + //get peerIndex from the arguments + if (args.length != 1) { + throw new IllegalArgumentException("Invalid argument number: expected to be 1 but actual is " + args.length); + } + final int peerIndex = Integer.parseInt(args[0]); + if (peerIndex < 0 || peerIndex > 2) { + throw new IllegalArgumentException("The server index must be 0, 1 or 2: peerIndex=" + peerIndex); + } + + startServer(peerIndex); + } catch(Throwable e) { + e.printStackTrace(); + System.err.println(); + System.err.println("args = " + Arrays.toString(args)); + System.err.println(); + System.err.println("Usage: java org.apache.ratis.examples.counter.server.CounterServer peer_index"); + System.err.println(); + System.err.println(" peer_index must be 0, 1 or 2"); System.exit(1); } + } - //find current peer object based on application parameter - final RaftPeer currentPeer = Constants.PEERS.get(Integer.parseInt(args[0]) - 1); + private static void startServer(int peerIndex) throws IOException { + //get peer and define storage dir + final RaftPeer currentPeer = Constants.PEERS.get(peerIndex); + final File storageDir = new File("./" + currentPeer.getId()); //start a counter server - final File storageDir = new File("./" + currentPeer.getId()); - final CounterServer counterServer = new CounterServer(currentPeer, storageDir); - counterServer.start(); + try(CounterServer counterServer = new CounterServer(currentPeer, storageDir)) { + counterServer.start(); - //exit when any input entered - Scanner scanner = new Scanner(System.in, UTF_8.name()); - scanner.nextLine(); - counterServer.close(); + //exit when any input entered + new Scanner(System.in, UTF_8.name()).nextLine(); + } } } diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterStateMachine.java b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterStateMachine.java index 7159ec1c5..d5a027910 100644 --- a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterStateMachine.java +++ b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterStateMachine.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,10 +15,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.ratis.examples.counter.server; -import org.apache.ratis.proto.RaftProtos; +import org.apache.ratis.examples.counter.CounterCommand; +import org.apache.ratis.proto.RaftProtos.LogEntryProto; +import org.apache.ratis.proto.RaftProtos.RaftPeerRole; import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.server.RaftServer; @@ -34,52 +35,81 @@ import org.apache.ratis.util.JavaUtils; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; -import java.nio.charset.Charset; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; /** - * State machine implementation for Counter server application. This class - * maintain a {@link AtomicInteger} object as a state and accept two commands: - * GET and INCREMENT, GET is a ReadOnly command which will be handled by - * {@code query} method however INCREMENT is a transactional command which - * will be handled by {@code applyTransaction}. + * A {@link org.apache.ratis.statemachine.StateMachine} implementation for the {@link CounterServer}. + * This class maintain a {@link AtomicInteger} object as a state and accept two commands: + * + * - {@link CounterCommand#GET} is a readonly command + * which is handled by the {@link #query(Message)} method. + * + * - {@link CounterCommand#INCREMENT} is a transactional command + * which is handled by the {@link #applyTransaction(TransactionContext)} method. */ public class CounterStateMachine extends BaseStateMachine { - private final SimpleStateMachineStorage storage = - new SimpleStateMachineStorage(); - private AtomicInteger counter = new AtomicInteger(0); + /** The state of the {@link CounterStateMachine}. */ + static class CounterState { + private final TermIndex applied; + private final int counter; + + CounterState(TermIndex applied, int counter) { + this.applied = applied; + this.counter = counter; + } + + TermIndex getApplied() { + return applied; + } + + int getCounter() { + return counter; + } + } + + private final SimpleStateMachineStorage storage = new SimpleStateMachineStorage(); + private final AtomicInteger counter = new AtomicInteger(0); + + /** @return the current state. */ + private synchronized CounterState getState() { + return new CounterState(getLastAppliedTermIndex(), counter.get()); + } + + private synchronized void updateState(TermIndex applied, int counterValue) { + updateLastAppliedTermIndex(applied); + counter.set(counterValue); + } + + private synchronized int incrementCounter(TermIndex termIndex) { + updateLastAppliedTermIndex(termIndex); + return counter.incrementAndGet(); + } /** - * initialize the state machine by initilize the state machine storage and - * calling the load method which reads the last applied command and restore it - * in counter object) + * Initialize the state machine storage and then load the state. * - * @param server the current server information - * @param groupId the cluster groupId - * @param raftStorage the raft storage which is used to keep raft related - * stuff - * @throws IOException if any error happens during load state + * @param server the server running this state machine + * @param groupId the id of the {@link org.apache.ratis.protocol.RaftGroup} + * @param raftStorage the storage of the server + * @throws IOException if it fails to load the state. */ @Override - public void initialize(RaftServer server, RaftGroupId groupId, - RaftStorage raftStorage) throws IOException { + public void initialize(RaftServer server, RaftGroupId groupId, RaftStorage raftStorage) throws IOException { super.initialize(server, groupId, raftStorage); - this.storage.init(raftStorage); - load(storage.getLatestSnapshot()); + storage.init(raftStorage); + reinitialize(); } /** - * very similar to initialize method, but doesn't initialize the storage - * system because the state machine reinitialized from the PAUSE state and - * storage system initialized before. + * Simply load the latest snapshot. * - * @throws IOException if any error happens during load state + * @throws IOException if it fails to load the state. */ @Override public void reinitialize() throws IOException { @@ -87,124 +117,107 @@ public class CounterStateMachine extends BaseStateMachine { } /** - * Store the current state as an snapshot file in the stateMachineStorage. + * Store the current state as a snapshot file in the {@link #storage}. * * @return the index of the snapshot */ @Override public long takeSnapshot() { - //get the last applied index - final TermIndex last = getLastAppliedTermIndex(); + //get the current state + final CounterState state = getState(); + final long index = state.getApplied().getIndex(); //create a file with a proper name to store the snapshot - final File snapshotFile = - storage.getSnapshotFile(last.getTerm(), last.getIndex()); + final File snapshotFile = storage.getSnapshotFile(state.getApplied().getTerm(), index); - //serialize the counter object and write it into the snapshot file - try (ObjectOutputStream out = new ObjectOutputStream( - new BufferedOutputStream(new FileOutputStream(snapshotFile)))) { - out.writeObject(counter); + //write the counter value into the snapshot file + try (ObjectOutputStream out = new ObjectOutputStream(new BufferedOutputStream( + Files.newOutputStream(snapshotFile.toPath())))) { + out.writeInt(state.getCounter()); } catch (IOException ioe) { LOG.warn("Failed to write snapshot file \"" + snapshotFile - + "\", last applied index=" + last); + + "\", last applied index=" + state.getApplied()); } //return the index of the stored snapshot (which is the last applied one) - return last.getIndex(); + return index; } /** - * Load the state of the state machine from the storage. + * Load the state of the state machine from the {@link #storage}. * - * @param snapshot to load + * @param snapshot the information of the snapshot being loaded * @return the index of the snapshot or -1 if snapshot is invalid - * @throws IOException if any error happens during read from storage + * @throws IOException if it failed to read from storage */ private long load(SingleFileSnapshotInfo snapshot) throws IOException { - //check the snapshot nullity + //check null if (snapshot == null) { LOG.warn("The snapshot info is null."); return RaftLog.INVALID_LOG_INDEX; } - - //check the existance of the snapshot file - final File snapshotFile = snapshot.getFile().getPath().toFile(); - if (!snapshotFile.exists()) { - LOG.warn("The snapshot file {} does not exist for snapshot {}", - snapshotFile, snapshot); + //check if the snapshot file exists. + final Path snapshotPath = snapshot.getFile().getPath(); + if (!Files.exists(snapshotPath)) { + LOG.warn("The snapshot file {} does not exist for snapshot {}", snapshotPath, snapshot); return RaftLog.INVALID_LOG_INDEX; } - //load the TermIndex object for the snapshot using the file name pattern of - // the snapshot - final TermIndex last = - SimpleStateMachineStorage.getTermIndexFromSnapshotFile(snapshotFile); - - //read the file and cast it to the AtomicInteger and set the counter - try (ObjectInputStream in = new ObjectInputStream( - new BufferedInputStream(new FileInputStream(snapshotFile)))) { - //set the last applied termIndex to the termIndex of the snapshot - setLastAppliedTermIndex(last); - - //read, cast and set the counter - counter = JavaUtils.cast(in.readObject()); - } catch (ClassNotFoundException e) { - throw new IllegalStateException(e); + //read the TermIndex from the snapshot file name + final TermIndex last = SimpleStateMachineStorage.getTermIndexFromSnapshotFile(snapshotPath.toFile()); + + //read the counter value from the snapshot file + final int counterValue; + try (ObjectInputStream in = new ObjectInputStream(new BufferedInputStream(Files.newInputStream(snapshotPath)))) { + counterValue = in.readInt(); } + //update state + updateState(last, counterValue); + return last.getIndex(); } /** - * Handle GET command, which used by clients to get the counter value. + * Process {@link CounterCommand#GET}, which gets the counter value. * - * @param request the GET message - * @return the Message containing the current counter value + * @param request the GET request + * @return a {@link Message} containing the current counter value as a {@link String}. */ @Override public CompletableFuture<Message> query(Message request) { - String msg = request.getContent().toString(Charset.defaultCharset()); - if (!msg.equals("GET")) { - return CompletableFuture.completedFuture( - Message.valueOf("Invalid Command")); + final String command = request.getContent().toStringUtf8(); + if (!CounterCommand.GET.matches(command)) { + return JavaUtils.completeExceptionally(new IllegalArgumentException("Invalid Command: " + command)); } - return CompletableFuture.completedFuture( - Message.valueOf(counter.toString())); + return CompletableFuture.completedFuture(Message.valueOf(counter.toString())); } /** - * Apply the INCREMENT command by incrementing the counter object. + * Apply the {@link CounterCommand#INCREMENT} by incrementing the counter object. * * @param trx the transaction context * @return the message containing the updated counter value */ @Override public CompletableFuture<Message> applyTransaction(TransactionContext trx) { - final RaftProtos.LogEntryProto entry = trx.getLogEntry(); + final LogEntryProto entry = trx.getLogEntry(); //check if the command is valid - String logData = entry.getStateMachineLogEntry().getLogData() - .toString(Charset.defaultCharset()); - if (!logData.equals("INCREMENT")) { - return CompletableFuture.completedFuture( - Message.valueOf("Invalid Command")); + final String command = entry.getStateMachineLogEntry().getLogData().toStringUtf8(); + if (!CounterCommand.INCREMENT.matches(command)) { + return JavaUtils.completeExceptionally(new IllegalArgumentException("Invalid Command: " + command)); } - //update the last applied term and index - final long index = entry.getIndex(); - updateLastAppliedTermIndex(entry.getTerm(), index); - - //actual execution of the command: increment the counter - counter.incrementAndGet(); + //increment the counter and update term-index + final TermIndex termIndex = TermIndex.valueOf(entry); + final long incremented = incrementCounter(termIndex); - //return the new value of the counter to the client - final CompletableFuture<Message> f = - CompletableFuture.completedFuture(Message.valueOf(counter.toString())); - - //if leader, log the incremented value and it's log index - if (trx.getServerRole() == RaftProtos.RaftPeerRole.LEADER) { - LOG.info("{}: Increment to {}", index, counter.toString()); + //if leader, log the incremented value and the term-index + if (trx.getServerRole() == RaftPeerRole.LEADER) { + LOG.info("{}: Increment to {}", termIndex, incremented); } - return f; + //return the new value of the counter to the client + return CompletableFuture.completedFuture(Message.valueOf(String.valueOf(incremented))); } } diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/counter/TestCounter.java b/ratis-examples/src/test/java/org/apache/ratis/examples/counter/TestCounter.java index e63789117..953fd5bfa 100644 --- a/ratis-examples/src/test/java/org/apache/ratis/examples/counter/TestCounter.java +++ b/ratis-examples/src/test/java/org/apache/ratis/examples/counter/TestCounter.java @@ -29,7 +29,6 @@ import org.junit.Test; import org.junit.runners.Parameterized; import java.io.IOException; -import java.nio.charset.Charset; import java.util.Collection; public class TestCounter extends ParameterizedBaseTest { @@ -50,20 +49,17 @@ public class TestCounter extends ParameterizedBaseTest { client.io().send(Message.valueOf("INCREMENT")); } RaftClientReply reply1 = client.io().sendReadOnly(Message.valueOf("GET")); - Assert.assertEquals("10", - reply1.getMessage().getContent().toString(Charset.defaultCharset())); + Assert.assertEquals("10", reply1.getMessage().getContent().toStringUtf8()); for (int i = 0; i < 10; i++) { client.io().send(Message.valueOf("INCREMENT")); } RaftClientReply reply2 = client.io().sendReadOnly(Message.valueOf("GET")); - Assert.assertEquals("20", - reply2.getMessage().getContent().toString(Charset.defaultCharset())); + Assert.assertEquals("20", reply2.getMessage().getContent().toStringUtf8()); for (int i = 0; i < 10; i++) { client.io().send(Message.valueOf("INCREMENT")); } RaftClientReply reply3 = client.io().sendReadOnly(Message.valueOf("GET")); - Assert.assertEquals("30", - reply3.getMessage().getContent().toString(Charset.defaultCharset())); + Assert.assertEquals("30", reply3.getMessage().getContent().toStringUtf8()); } } } diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java index 9cc4c5b21..629a55a67 100644 --- a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java +++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java @@ -132,9 +132,12 @@ public class BaseStateMachine implements StateMachine, StateMachine.DataApi, updateLastAppliedTermIndex(term, index); } - @SuppressFBWarnings("NP_NULL_PARAM_DEREF") protected boolean updateLastAppliedTermIndex(long term, long index) { - final TermIndex newTI = TermIndex.valueOf(term, index); + return updateLastAppliedTermIndex(TermIndex.valueOf(term, index)); + } + + protected boolean updateLastAppliedTermIndex(TermIndex newTI) { + Objects.requireNonNull(newTI, "newTI == null"); final TermIndex oldTI = lastAppliedTermIndex.getAndSet(newTI); if (!newTI.equals(oldTI)) { LOG.trace("{}: update lastAppliedTermIndex from {} to {}", getId(), oldTI, newTI); @@ -147,7 +150,7 @@ public class BaseStateMachine implements StateMachine, StateMachine.DataApi, } synchronized (transactionFutures) { - for(long i; !transactionFutures.isEmpty() && (i = transactionFutures.firstKey()) <= index; ) { + for(long i; !transactionFutures.isEmpty() && (i = transactionFutures.firstKey()) <= newTI.getIndex(); ) { transactionFutures.remove(i).complete(null); } }
