This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new c73a3eb8c RATIS-1979. Allow StateMachine.read to return a
ReferentCountedObject (#1062)
c73a3eb8c is described below
commit c73a3eb8c026133e0e9b8ada1a2aeb467812cb37
Author: Duong Nguyen <[email protected]>
AuthorDate: Tue Apr 2 07:09:24 2024 -0700
RATIS-1979. Allow StateMachine.read to return a ReferentCountedObject
(#1062)
---
.../apache/ratis/examples/filestore/FileInfo.java | 3 +-
.../apache/ratis/statemachine/StateMachine.java | 22 ++++++++
.../apache/ratis/server/raftlog/LogProtoUtils.java | 10 +++-
.../apache/ratis/server/raftlog/RaftLogBase.java | 62 +++++++++++++++++++---
.../ratis/server/raftlog/memory/MemoryRaftLog.java | 2 +-
.../ratis/server/raftlog/segmented/LogSegment.java | 2 +-
.../server/raftlog/segmented/SegmentedRaftLog.java | 10 ++--
7 files changed, 95 insertions(+), 16 deletions(-)
diff --git
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileInfo.java
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileInfo.java
index c7d8cb7cd..bba001002 100644
---
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileInfo.java
+++
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileInfo.java
@@ -19,6 +19,7 @@ package org.apache.ratis.examples.filestore;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.thirdparty.com.google.protobuf.UnsafeByteOperations;
import org.apache.ratis.util.CollectionUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.LogUtils;
@@ -84,7 +85,7 @@ abstract class FileInfo {
final ByteBuffer buffer =
ByteBuffer.allocateDirect(FileStoreCommon.getChunkSize(length));
in.position(offset).read(buffer);
buffer.flip();
- return ByteString.copyFrom(buffer);
+ return UnsafeByteOperations.unsafeWrap(buffer);
}
}
diff --git
a/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java
b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java
index 915b70bb8..b68c724ab 100644
---
a/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java
+++
b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java
@@ -88,6 +88,28 @@ public interface StateMachine extends Closeable {
return read(entry);
}
+ /**
+ * Read asynchronously the state machine data from this state machine.
+ * StateMachines implement this method when the read result contains
retained resources that should be released
+ * after use.
+ *
+ * @return a future for the read task. The result of the future is a
{@link ReferenceCountedObject} wrapping the
+ * read result. Client code of this method must call {@link
ReferenceCountedObject#release()} after
+ * use.
+ */
+ default CompletableFuture<ReferenceCountedObject<ByteString>>
retainRead(LogEntryProto entry,
+ TransactionContext context) {
+ return read(entry, context).thenApply(r -> {
+ if (r == null) {
+ return null;
+ }
+ ReferenceCountedObject<ByteString> ref =
ReferenceCountedObject.wrap(r);
+ ref.retain();
+ return ref;
+
+ });
+ }
+
/**
* Write asynchronously the state machine data in the given log entry to
this state machine.
*
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/LogProtoUtils.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/LogProtoUtils.java
index b177f0e14..59da4c3fd 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/LogProtoUtils.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/LogProtoUtils.java
@@ -137,8 +137,9 @@ public final class LogProtoUtils {
}
private static LogEntryProto
replaceStateMachineDataWithSerializedSize(LogEntryProto entry) {
- return replaceStateMachineEntry(entry,
+ LogEntryProto replaced = replaceStateMachineEntry(entry,
StateMachineEntryProto.newBuilder().setLogEntryProtoSerializedSize(entry.getSerializedSize()));
+ return copy(replaced);
}
private static LogEntryProto replaceStateMachineEntry(LogEntryProto proto,
StateMachineEntryProto.Builder newEntry) {
@@ -160,6 +161,13 @@ public final class LogProtoUtils {
return replaceStateMachineEntry(entry,
StateMachineEntryProto.newBuilder().setStateMachineData(stateMachineData));
}
+ public static boolean hasStateMachineData(LogEntryProto entry) {
+ return getStateMachineEntry(entry)
+ .map(StateMachineEntryProto::getStateMachineData)
+ .map(data -> !data.isEmpty())
+ .orElse(false);
+ }
+
public static boolean isStateMachineDataEmpty(LogEntryProto entry) {
return getStateMachineEntry(entry)
.map(StateMachineEntryProto::getStateMachineData)
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
index 284776d10..9e079564f 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
@@ -17,6 +17,7 @@
*/
package org.apache.ratis.server.raftlog;
+import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
@@ -410,8 +411,43 @@ public abstract class RaftLogBase implements RaftLog {
return name;
}
- protected EntryWithData newEntryWithData(LogEntryProto logEntry,
CompletableFuture<ByteString> future) {
- return new EntryWithDataImpl(logEntry, future);
+ protected ReferenceCountedObject<EntryWithData>
newEntryWithData(ReferenceCountedObject<LogEntryProto> retained) {
+ return retained.delegate(new EntryWithDataImpl(retained.get(), null));
+ }
+
+ protected ReferenceCountedObject<EntryWithData>
newEntryWithData(ReferenceCountedObject<LogEntryProto> retained,
+ CompletableFuture<ReferenceCountedObject<ByteString>>
stateMachineDataFuture) {
+ final EntryWithDataImpl impl = new EntryWithDataImpl(retained.get(),
stateMachineDataFuture);
+ return new ReferenceCountedObject<EntryWithData>() {
+ private CompletableFuture<ReferenceCountedObject<ByteString>> future
+ = Objects.requireNonNull(stateMachineDataFuture,
"stateMachineDataFuture == null");
+
+ @Override
+ public EntryWithData get() {
+ return impl;
+ }
+
+ synchronized void updateFuture(Consumer<ReferenceCountedObject<?>>
action) {
+ future = future.whenComplete((ref, e) -> {
+ if (ref != null) {
+ action.accept(ref);
+ }
+ });
+ }
+
+ @Override
+ public EntryWithData retain() {
+ retained.retain();
+ updateFuture(ReferenceCountedObject::retain);
+ return impl;
+ }
+
+ @Override
+ public boolean release() {
+ updateFuture(ReferenceCountedObject::release);
+ return retained.release();
+ }
+ };
}
/**
@@ -419,14 +455,14 @@ public abstract class RaftLogBase implements RaftLog {
*/
class EntryWithDataImpl implements EntryWithData {
private final LogEntryProto logEntry;
- private final CompletableFuture<ByteString> future;
+ private final CompletableFuture<ReferenceCountedObject<ByteString>> future;
- EntryWithDataImpl(LogEntryProto logEntry, CompletableFuture<ByteString>
future) {
+ EntryWithDataImpl(LogEntryProto logEntry,
CompletableFuture<ReferenceCountedObject<ByteString>> future) {
this.logEntry = logEntry;
this.future = future == null? null:
future.thenApply(this::checkStateMachineData);
}
- private ByteString checkStateMachineData(ByteString data) {
+ private ReferenceCountedObject<ByteString>
checkStateMachineData(ReferenceCountedObject<ByteString> data) {
if (data == null) {
throw new IllegalStateException("State machine data is null for log
entry " + this);
}
@@ -450,18 +486,21 @@ public abstract class RaftLogBase implements RaftLog {
}
final LogEntryProto entryProto;
+ ReferenceCountedObject<ByteString> data;
try {
- entryProto = future.thenApply(data ->
LogProtoUtils.addStateMachineData(data, logEntry))
- .get(timeout.getDuration(), timeout.getUnit());
+ data = future.get(timeout.getDuration(), timeout.getUnit());
+ entryProto = LogProtoUtils.addStateMachineData(data.get(), logEntry);
} catch (TimeoutException t) {
if (timeout.compareTo(stateMachineDataReadTimeout) > 0) {
getRaftLogMetrics().onStateMachineDataReadTimeout();
}
+ discardData();
throw t;
} catch (Exception e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
+ discardData();
final String err = getName() + ": Failed readStateMachineData for " +
this;
LOG.error(err, e);
throw new RaftLogIOException(err,
JavaUtils.unwrapCompletionException(e));
@@ -471,11 +510,20 @@ public abstract class RaftLogBase implements RaftLog {
if (LogProtoUtils.isStateMachineDataEmpty(entryProto)) {
final String err = getName() + ": State machine data not set for " +
this;
LOG.error(err);
+ data.release();
throw new RaftLogIOException(err);
}
return entryProto;
}
+ private void discardData() {
+ future.whenComplete((r, ex) -> {
+ if (r != null) {
+ r.release();
+ }
+ });
+ }
+
@Override
public String toString() {
return toLogEntryString(logEntry);
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
index 55036fac5..2aac6c1b1 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
@@ -142,7 +142,7 @@ public class MemoryRaftLog extends RaftLogBase {
@Override
public ReferenceCountedObject<EntryWithData> retainEntryWithData(long index)
{
final ReferenceCountedObject<LogEntryProto> ref = retainLog(index);
- return ref.delegate(newEntryWithData(ref.get(), null));
+ return newEntryWithData(ref);
}
@Override
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
index 2fcd7914e..2542e9998 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
@@ -73,7 +73,7 @@ public final class LogSegment {
case CHECK_SEGMENT_FILE_FULL:
case LOAD_SEGMENT_FILE:
case WRITE_CACHE_WITH_STATE_MACHINE_CACHE:
- Preconditions.assertTrue(entry ==
LogProtoUtils.removeStateMachineData(entry),
+ Preconditions.assertTrue(!LogProtoUtils.hasStateMachineData(entry),
() -> "Unexpected LogEntryProto with StateMachine data: op=" + op
+ ", entry=" + entry);
break;
case WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE:
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
index b7dd32689..a0f7c1e72 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
@@ -331,18 +331,18 @@ public final class SegmentedRaftLog extends RaftLogBase {
final LogEntryProto entry = entryRef.get();
if (!LogProtoUtils.isStateMachineDataEmpty(entry)) {
- return entryRef.delegate(newEntryWithData(entry, null));
+ return newEntryWithData(entryRef);
}
try {
- CompletableFuture<ByteString> future = null;
+ CompletableFuture<ReferenceCountedObject<ByteString>> future = null;
if (stateMachine != null) {
- future = stateMachine.data().read(entry,
server.getTransactionContext(entry, false)).exceptionally(ex -> {
+ future = stateMachine.data().retainRead(entry,
server.getTransactionContext(entry, false)).exceptionally(ex -> {
stateMachine.event().notifyLogFailed(ex, entry);
throw new CompletionException("Failed to read state machine data for
log entry " + entry, ex);
});
}
- return entryRef.delegate(newEntryWithData(entry, future));
+ return future != null? newEntryWithData(entryRef, future):
newEntryWithData(entryRef);
} catch (Exception e) {
final String err = getName() + ": Failed readStateMachineData for " +
LogProtoUtils.toLogEntryString(entry);
@@ -459,7 +459,7 @@ public final class SegmentedRaftLog extends RaftLogBase {
if (stateMachineCachingEnabled) {
// The stateMachineData will be cached inside the StateMachine itself.
cache.appendEntry(LogSegment.Op.WRITE_CACHE_WITH_STATE_MACHINE_CACHE,
- entryRef.delegate(removedStateMachineData));
+ ReferenceCountedObject.wrap(removedStateMachineData));
} else {
cache.appendEntry(LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE,
entryRef
);