szetszwo commented on code in PR #1036:
URL: https://github.com/apache/ratis/pull/1036#discussion_r1474891309
##########
ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java:
##########
@@ -101,11 +103,30 @@ default CompletableFuture<?> write(LogEntryProto entry) {
* Write asynchronously the state machine data in the given log entry to
this state machine.
*
* @return a future for the write task
+ * @deprecated Applications should implement {@link
#write(ReferenceCountedObject, TransactionContext)} instead.
*/
+ @Deprecated
default CompletableFuture<?> write(LogEntryProto entry, TransactionContext
context) {
return write(entry);
}
+ /**
+ * Write asynchronously the state machine data in the given log entry to
this state machine.
+ * Implementations of this method should use {@link
ReferenceCountedObject#get()} to use the log entry within the
+ * scope of this method. If they need to use the log entry in an extended
scope, e.g. for asynchronous computation
+ * or cache, the entryRef should be retained and released.
Review Comment:
Let's move this part to `@param entryRef`.
```java
* @param entryRef Reference to a log entry.
* Implementations of this method may call {@link
ReferenceCountedObject#get()}
* to access the log entry before this method returns.
* If the log entry is needed after this method returns,
* e.g. for asynchronous computation or caching,
* the implementation must invoke {@link
ReferenceCountedObject#retain()}
* and {@link ReferenceCountedObject#release()}.
```
##########
ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java:
##########
@@ -486,25 +487,28 @@ private class WriteLog extends Task {
private final CompletableFuture<?> stateMachineFuture;
private final CompletableFuture<Long> combined;
- WriteLog(LogEntryProto entry, LogEntryProto removedStateMachineData,
TransactionContext context) {
+ WriteLog(ReferenceCountedObject<LogEntryProto> entryRef, LogEntryProto
removedStateMachineData,
+ TransactionContext context) {
+ LogEntryProto origEntry = entryRef.get();
this.entry = removedStateMachineData;
- if (this.entry == entry) {
- final StateMachineLogEntryProto proto =
entry.hasStateMachineLogEntry()? entry.getStateMachineLogEntry(): null;
+ if (this.entry == origEntry) {
+ final StateMachineLogEntryProto proto =
origEntry.hasStateMachineLogEntry()?
+ origEntry.getStateMachineLogEntry(): null;
if (stateMachine != null && proto != null && proto.getType() ==
StateMachineLogEntryProto.Type.DATASTREAM) {
final ClientInvocationId invocationId =
ClientInvocationId.valueOf(proto);
final CompletableFuture<DataStream> removed =
server.getDataStreamMap().remove(invocationId);
- this.stateMachineFuture = removed == null?
stateMachine.data().link(null, entry)
- : removed.thenApply(stream -> stateMachine.data().link(stream,
entry));
+ this.stateMachineFuture = removed == null?
stateMachine.data().link(null, origEntry)
+ : removed.thenApply(stream -> stateMachine.data().link(stream,
origEntry));
} else {
this.stateMachineFuture = null;
}
} else {
try {
// this.entry != entry iff the entry has state machine data
Review Comment:
This comment needs to be updated:
```java
// this.entry != origEntry iff it has state machine data
```
##########
ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java:
##########
@@ -101,11 +103,30 @@ default CompletableFuture<?> write(LogEntryProto entry) {
* Write asynchronously the state machine data in the given log entry to
this state machine.
*
* @return a future for the write task
+ * @deprecated Applications should implement {@link
#write(ReferenceCountedObject, TransactionContext)} instead.
*/
+ @Deprecated
default CompletableFuture<?> write(LogEntryProto entry, TransactionContext
context) {
return write(entry);
}
+ /**
+ * Write asynchronously the state machine data in the given log entry to
this state machine.
+ * Implementations of this method should use {@link
ReferenceCountedObject#get()} to use the log entry within the
+ * scope of this method. If they need to use the log entry in an extended
scope, e.g. for asynchronous computation
+ * or cache, the entryRef should be retained and released.
+ *
+ * @return a future for the write task
+ */
+ default CompletableFuture<?> write(ReferenceCountedObject<LogEntryProto>
entryRef, TransactionContext context) {
+ try {
+ // for backward compatibility, the log entry is copied to be decoupled
from entryRef.
+ return write(copy(entryRef.retain()), context);
+ } finally {
+ entryRef.release();
+ }
+ }
Review Comment:
Since `StateMachine` is a public API, let's combine the `copy` method here.
Otherwise, the `copy` method also becomes an API.
BTW, we may use the `get()` method according to the javadoc.
```java
default CompletableFuture<?> write(ReferenceCountedObject<LogEntryProto>
entryRef, TransactionContext context) {
final LogEntryProto entry = entryRef.get();
try {
final LogEntryProto copy =
LogEntryProto.parseFrom(entry.toByteString());
return write(copy, context);
} catch (InvalidProtocolBufferException e) {
return JavaUtils.completeExceptionally(new IllegalStateException(
"Failed to copy log entry " + TermIndex.valueOf(entry), e));
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]