This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 775b286c4 RATIS-1997. Refactor StateMachine interface to use
ReferenceCountedObject (#1036)
775b286c4 is described below
commit 775b286c4540057e5f81419806d9e7737e5f568e
Author: Duong Nguyen <[email protected]>
AuthorDate: Thu Feb 1 15:39:48 2024 -0800
RATIS-1997. Refactor StateMachine interface to use ReferenceCountedObject
(#1036)
---
.../examples/filestore/FileStoreStateMachine.java | 9 +++++---
.../apache/ratis/statemachine/StateMachine.java | 27 ++++++++++++++++++++++
.../server/raftlog/segmented/SegmentedRaftLog.java | 2 +-
.../raftlog/segmented/SegmentedRaftLogWorker.java | 24 +++++++++++--------
.../impl/SimpleStateMachine4Testing.java | 4 +++-
.../raftlog/segmented/TestSegmentedRaftLog.java | 3 ++-
6 files changed, 53 insertions(+), 16 deletions(-)
diff --git
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
index 858e300ec..0ee7a60ac 100644
---
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
+++
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
@@ -42,6 +42,7 @@ import
org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import
org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.ratis.util.FileUtils;
import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.ReferenceCountedObject;
import java.io.IOException;
import java.nio.file.Path;
@@ -123,7 +124,8 @@ public class FileStoreStateMachine extends BaseStateMachine
{
}
@Override
- public CompletableFuture<Integer> write(LogEntryProto entry,
TransactionContext context) {
+ public CompletableFuture<Integer>
write(ReferenceCountedObject<LogEntryProto> entryRef, TransactionContext
context) {
+ LogEntryProto entry = entryRef.retain();
final FileStoreRequestProto proto = getProto(context, entry);
if (proto.getRequestCase() !=
FileStoreRequestProto.RequestCase.WRITEHEADER) {
return null;
@@ -132,9 +134,10 @@ public class FileStoreStateMachine extends
BaseStateMachine {
final WriteRequestHeaderProto h = proto.getWriteHeader();
final CompletableFuture<Integer> f = files.write(entry.getIndex(),
h.getPath().toStringUtf8(), h.getClose(), h.getSync(), h.getOffset(),
-
entry.getStateMachineLogEntry().getStateMachineEntry().getStateMachineData());
+
entry.getStateMachineLogEntry().getStateMachineEntry().getStateMachineData()
+ ).whenComplete((r, e) -> entryRef.release());
// sync only if closing the file
- return h.getClose()? f: null;
+ return h.getClose() ? f: null;
}
static FileStoreRequestProto getProto(TransactionContext context,
LogEntryProto entry) {
diff --git
a/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java
b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java
index b1fc5adda..915b70bb8 100644
---
a/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java
+++
b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java
@@ -92,7 +92,9 @@ public interface StateMachine extends Closeable {
* Write asynchronously the state machine data in the given log entry to
this state machine.
*
* @return a future for the write task
+ * @deprecated Applications should implement {@link
#write(ReferenceCountedObject, TransactionContext)} instead.
*/
+ @Deprecated
default CompletableFuture<?> write(LogEntryProto entry) {
return CompletableFuture.completedFuture(null);
}
@@ -101,11 +103,36 @@ public interface StateMachine extends Closeable {
* Write asynchronously the state machine data in the given log entry to
this state machine.
*
* @return a future for the write task
+ * @deprecated Applications should implement {@link
#write(ReferenceCountedObject, TransactionContext)} instead.
*/
+ @Deprecated
default CompletableFuture<?> write(LogEntryProto entry, TransactionContext
context) {
return write(entry);
}
+ /**
+ * Write asynchronously the state machine data in the given log entry to
this state machine.
+ *
+ * @param entryRef Reference to a log entry.
+ * Implementations of this method may call {@link
ReferenceCountedObject#get()}
+ * to access the log entry before this method returns.
+ * If the log entry is needed after this method returns,
+ * e.g. for asynchronous computation or caching,
+ * the implementation must invoke {@link
ReferenceCountedObject#retain()}
+ * and {@link ReferenceCountedObject#release()}.
+ * @return a future for the write task
+ */
+ default CompletableFuture<?> write(ReferenceCountedObject<LogEntryProto>
entryRef, TransactionContext context) {
+ final LogEntryProto entry = entryRef.get();
+ try {
+ final LogEntryProto copy =
LogEntryProto.parseFrom(entry.toByteString());
+ return write(copy, context);
+ } catch (InvalidProtocolBufferException e) {
+ return JavaUtils.completeExceptionally(new IllegalStateException(
+ "Failed to copy log entry " + TermIndex.valueOf(entry), e));
+ }
+ }
+
/**
* Create asynchronously a {@link DataStream} to stream state machine data.
* The state machine may use the first message (i.e. request.getMessage())
as the header to create the stream.
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
index 4e057c07b..baac0c6c7 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
@@ -428,7 +428,7 @@ public final class SegmentedRaftLog extends RaftLogBase {
// If the entry has state machine data, then the entry should be inserted
// to statemachine first and then to the cache. Not following the order
// will leave a spurious entry in the cache.
- final Task write = fileLogWorker.writeLogEntry(entry,
removedStateMachineData, context);
+ final Task write = fileLogWorker.writeLogEntry(entryRef,
removedStateMachineData, context);
if (stateMachineCachingEnabled) {
// The stateMachineData will be cached inside the StateMachine itself.
cache.appendEntry(LogSegment.Op.WRITE_CACHE_WITH_STATE_MACHINE_CACHE,
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
index 0d1ea763b..02506079f 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
@@ -438,7 +438,8 @@ class SegmentedRaftLogWorker {
addIOTask(new StartLogSegment(segmentToClose.getEndIndex() + 1));
}
- Task writeLogEntry(LogEntryProto entry, LogEntryProto
removedStateMachineData, TransactionContext context) {
+ Task writeLogEntry(ReferenceCountedObject<LogEntryProto> entry,
+ LogEntryProto removedStateMachineData, TransactionContext context) {
return addIOTask(new WriteLog(entry, removedStateMachineData, context));
}
@@ -486,25 +487,28 @@ class SegmentedRaftLogWorker {
private final CompletableFuture<?> stateMachineFuture;
private final CompletableFuture<Long> combined;
- WriteLog(LogEntryProto entry, LogEntryProto removedStateMachineData,
TransactionContext context) {
+ WriteLog(ReferenceCountedObject<LogEntryProto> entryRef, LogEntryProto
removedStateMachineData,
+ TransactionContext context) {
+ LogEntryProto origEntry = entryRef.get();
this.entry = removedStateMachineData;
- if (this.entry == entry) {
- final StateMachineLogEntryProto proto =
entry.hasStateMachineLogEntry()? entry.getStateMachineLogEntry(): null;
+ if (this.entry == origEntry) {
+ final StateMachineLogEntryProto proto =
origEntry.hasStateMachineLogEntry() ?
+ origEntry.getStateMachineLogEntry(): null;
if (stateMachine != null && proto != null && proto.getType() ==
StateMachineLogEntryProto.Type.DATASTREAM) {
final ClientInvocationId invocationId =
ClientInvocationId.valueOf(proto);
final CompletableFuture<DataStream> removed =
server.getDataStreamMap().remove(invocationId);
- this.stateMachineFuture = removed == null?
stateMachine.data().link(null, entry)
- : removed.thenApply(stream -> stateMachine.data().link(stream,
entry));
+ this.stateMachineFuture = removed == null?
stateMachine.data().link(null, origEntry)
+ : removed.thenApply(stream -> stateMachine.data().link(stream,
origEntry));
} else {
this.stateMachineFuture = null;
}
} else {
try {
- // this.entry != entry iff the entry has state machine data
- this.stateMachineFuture = stateMachine.data().write(entry, context);
+ // this.entry != origEntry if it has state machine data
+ this.stateMachineFuture = stateMachine.data().write(entryRef,
context);
} catch (Exception e) {
- LOG.error(name + ": writeStateMachineData failed for index " +
entry.getIndex()
- + ", entry=" + LogProtoUtils.toLogEntryString(entry,
stateMachine::toStateMachineLogEntryString), e);
+ LOG.error(name + ": writeStateMachineData failed for index " +
origEntry.getIndex()
+ + ", entry=" + LogProtoUtils.toLogEntryString(origEntry,
stateMachine::toStateMachineLogEntryString), e);
throw e;
}
}
diff --git
a/ratis-server/src/test/java/org/apache/ratis/statemachine/impl/SimpleStateMachine4Testing.java
b/ratis-server/src/test/java/org/apache/ratis/statemachine/impl/SimpleStateMachine4Testing.java
index 7c40ec251..17d5a607a 100644
---
a/ratis-server/src/test/java/org/apache/ratis/statemachine/impl/SimpleStateMachine4Testing.java
+++
b/ratis-server/src/test/java/org/apache/ratis/statemachine/impl/SimpleStateMachine4Testing.java
@@ -48,6 +48,7 @@ import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.LifeCycle;
import org.apache.ratis.util.MD5FileUtil;
import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.ReferenceCountedObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -367,7 +368,8 @@ public class SimpleStateMachine4Testing extends
BaseStateMachine {
}
@Override
- public CompletableFuture<Void> write(LogEntryProto entry) {
+ public CompletableFuture<Void> write(ReferenceCountedObject<LogEntryProto>
entry, TransactionContext context) {
+ Preconditions.assertTrue(entry.get() != null);
return blocking.getFuture(Blocking.Type.WRITE_STATE_MACHINE_DATA);
}
diff --git
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
index 3d5d5f87d..38341e025 100644
---
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
+++
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
@@ -39,6 +39,7 @@ import org.apache.ratis.server.raftlog.LogProtoUtils;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.server.storage.RaftStorageTestUtils;
+import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.statemachine.impl.SimpleStateMachine4Testing;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.impl.BaseStateMachine;
@@ -634,7 +635,7 @@ public class TestSegmentedRaftLog extends BaseTest {
final LogEntryProto entry = prepareLogEntry(0, 0, null, true);
final StateMachine sm = new BaseStateMachine() {
@Override
- public CompletableFuture<Void> write(LogEntryProto entry) {
+ public CompletableFuture<Void>
write(ReferenceCountedObject<LogEntryProto> entry, TransactionContext context) {
getLifeCycle().transition(LifeCycle.State.STARTING);
getLifeCycle().transition(LifeCycle.State.RUNNING);