This is an automated email from the ASF dual-hosted git repository.
williamsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new ba8e62cd2 RATIS-1897. Make TransactionContext available in
DataApi.write(..). (#930)
ba8e62cd2 is described below
commit ba8e62cd2f22593af22317ea7c1cc5a0b08a23b3
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Thu Oct 12 02:51:36 2023 -0700
RATIS-1897. Make TransactionContext available in DataApi.write(..). (#930)
---
.../apache/ratis/examples/filestore/FileStore.java | 2 +-
.../ratis/examples/filestore/FileStoreCommon.java | 3 +-
.../examples/filestore/FileStoreStateMachine.java | 71 ++++++------
.../ratis/server/raftlog/RaftLogSequentialOps.java | 8 ++
.../apache/ratis/statemachine/StateMachine.java | 38 ++++++-
.../apache/ratis/server/impl/RaftServerImpl.java | 33 ++++--
.../org/apache/ratis/server/impl/ServerState.java | 15 ++-
.../ratis/server/impl/TransactionManager.java | 44 ++++++++
.../apache/ratis/server/raftlog/RaftLogBase.java | 11 +-
.../ratis/server/raftlog/memory/MemoryRaftLog.java | 3 +-
.../server/raftlog/segmented/SegmentedRaftLog.java | 124 +++++++++++++++++----
.../raftlog/segmented/SegmentedRaftLogWorker.java | 9 +-
.../ratis/server/impl/RaftServerTestUtil.java | 13 ++-
.../ratis/server/impl/RetryCacheTestUtil.java | 12 +-
.../raftlog/segmented/TestSegmentedRaftLog.java | 29 ++++-
15 files changed, 322 insertions(+), 93 deletions(-)
diff --git
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java
index 04ff64bd8..a930170ec 100644
---
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java
+++
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java
@@ -239,7 +239,7 @@ public class FileStore implements Closeable {
uc = files.get(relative).asUnderConstruction();
} catch (FileNotFoundException e) {
return FileStoreCommon.completeExceptionally(
- index, "Failed to write to " + relative, e);
+ index, "Failed to submitCommit to " + relative, e);
}
return uc.submitCommit(offset, size, converter, committer, getId(), index)
diff --git
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreCommon.java
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreCommon.java
index 152bc5a23..c96662b2a 100644
---
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreCommon.java
+++
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreCommon.java
@@ -59,7 +59,6 @@ public interface FileStoreCommon {
static <T> CompletableFuture<T> completeExceptionally(
String message, Throwable cause) {
- return JavaUtils.completeExceptionally(
- new IOException(message).initCause(cause));
+ return JavaUtils.completeExceptionally(new IOException(message, cause));
}
}
diff --git
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
index 209baaf7d..5f258ee3b 100644
---
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
+++
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
@@ -26,8 +26,8 @@ import org.apache.ratis.proto.ExamplesProtos.ReadRequestProto;
import org.apache.ratis.proto.ExamplesProtos.StreamWriteRequestProto;
import org.apache.ratis.proto.ExamplesProtos.WriteRequestHeaderProto;
import org.apache.ratis.proto.ExamplesProtos.WriteRequestProto;
+import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
-import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftGroupId;
@@ -97,29 +97,32 @@ public class FileStoreStateMachine extends BaseStateMachine
{
final TransactionContext.Builder b = TransactionContext.newBuilder()
.setStateMachine(this)
.setClientRequest(request);
-
if (proto.getRequestCase() == FileStoreRequestProto.RequestCase.WRITE) {
final WriteRequestProto write = proto.getWrite();
final FileStoreRequestProto newProto = FileStoreRequestProto.newBuilder()
.setWriteHeader(write.getHeader()).build();
-
b.setLogData(newProto.toByteString()).setStateMachineData(write.getData());
+
b.setLogData(newProto.toByteString()).setStateMachineData(write.getData())
+ .setStateMachineContext(newProto);
} else {
- b.setLogData(content);
+ b.setLogData(content)
+ .setStateMachineContext(proto);
}
return b.build();
}
@Override
- public CompletableFuture<Integer> write(LogEntryProto entry) {
- final StateMachineLogEntryProto smLog = entry.getStateMachineLogEntry();
- final ByteString data = smLog.getLogData();
- final FileStoreRequestProto proto;
- try {
- proto = FileStoreRequestProto.parseFrom(data);
- } catch (InvalidProtocolBufferException e) {
- return FileStoreCommon.completeExceptionally(
- entry.getIndex(), "Failed to parse data, entry=" + entry, e);
- }
+ public TransactionContext startTransaction(LogEntryProto entry,
RaftProtos.RaftPeerRole role) {
+ return TransactionContext.newBuilder()
+ .setStateMachine(this)
+ .setLogEntry(entry)
+ .setServerRole(role)
+ .setStateMachineContext(getProto(entry))
+ .build();
+ }
+
+ @Override
+ public CompletableFuture<Integer> write(LogEntryProto entry,
TransactionContext context) {
+ final FileStoreRequestProto proto = getProto(context, entry);
if (proto.getRequestCase() !=
FileStoreRequestProto.RequestCase.WRITEHEADER) {
return null;
}
@@ -127,22 +130,32 @@ public class FileStoreStateMachine extends
BaseStateMachine {
final WriteRequestHeaderProto h = proto.getWriteHeader();
final CompletableFuture<Integer> f = files.write(entry.getIndex(),
h.getPath().toStringUtf8(), h.getClose(), h.getSync(), h.getOffset(),
- smLog.getStateMachineEntry().getStateMachineData());
+
entry.getStateMachineLogEntry().getStateMachineEntry().getStateMachineData());
// sync only if closing the file
return h.getClose()? f: null;
}
- @Override
- public CompletableFuture<ByteString> read(LogEntryProto entry) {
- final StateMachineLogEntryProto smLog = entry.getStateMachineLogEntry();
- final ByteString data = smLog.getLogData();
- final FileStoreRequestProto proto;
+ static FileStoreRequestProto getProto(TransactionContext context,
LogEntryProto entry) {
+ if (context != null) {
+ final FileStoreRequestProto proto = (FileStoreRequestProto)
context.getStateMachineContext();
+ if (proto != null) {
+ return proto;
+ }
+ }
+ return getProto(entry);
+ }
+
+ static FileStoreRequestProto getProto(LogEntryProto entry) {
try {
- proto = FileStoreRequestProto.parseFrom(data);
+ return
FileStoreRequestProto.parseFrom(entry.getStateMachineLogEntry().getLogData());
} catch (InvalidProtocolBufferException e) {
- return FileStoreCommon.completeExceptionally(
- entry.getIndex(), "Failed to parse data, entry=" + entry, e);
+ throw new IllegalArgumentException("Failed to parse data, entry=" +
entry, e);
}
+ }
+
+ @Override
+ public CompletableFuture<ByteString> read(LogEntryProto entry,
TransactionContext context) {
+ final FileStoreRequestProto proto = getProto(context, entry);
if (proto.getRequestCase() !=
FileStoreRequestProto.RequestCase.WRITEHEADER) {
return null;
}
@@ -206,20 +219,14 @@ public class FileStoreStateMachine extends
BaseStateMachine {
final long index = entry.getIndex();
updateLastAppliedTermIndex(entry.getTerm(), index);
- final StateMachineLogEntryProto smLog = entry.getStateMachineLogEntry();
- final FileStoreRequestProto request;
- try {
- request = FileStoreRequestProto.parseFrom(smLog.getLogData());
- } catch (InvalidProtocolBufferException e) {
- return FileStoreCommon.completeExceptionally(index,
- "Failed to parse logData in" + smLog, e);
- }
+ final FileStoreRequestProto request = getProto(trx, entry);
switch(request.getRequestCase()) {
case DELETE:
return delete(index, request.getDelete());
case WRITEHEADER:
- return writeCommit(index, request.getWriteHeader(),
smLog.getStateMachineEntry().getStateMachineData().size());
+ return writeCommit(index, request.getWriteHeader(),
+
entry.getStateMachineLogEntry().getStateMachineEntry().getStateMachineData().size());
case STREAM:
return streamCommit(request.getStream());
case WRITE:
diff --git
a/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLogSequentialOps.java
b/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLogSequentialOps.java
index 32bd564e0..7b9f42b6b 100644
---
a/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLogSequentialOps.java
+++
b/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLogSequentialOps.java
@@ -121,6 +121,14 @@ interface RaftLogSequentialOps {
*/
CompletableFuture<Long> appendEntry(LogEntryProto entry);
+ /**
+ * Append asynchronously an entry.
+ * Used by the leader.
+ */
+ default CompletableFuture<Long> appendEntry(LogEntryProto entry,
TransactionContext context) {
+ return appendEntry(entry);
+ }
+
/**
* The same as append(Arrays.asList(entries)).
*
diff --git
a/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java
b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java
index 90813f325..b1fc5adda 100644
---
a/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java
+++
b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java
@@ -79,6 +79,15 @@ public interface StateMachine extends Closeable {
throw new UnsupportedOperationException("This method is NOT supported.");
}
+ /**
+ * Read asynchronously the state machine data from this state machine.
+ *
+ * @return a future for the read task.
+ */
+ default CompletableFuture<ByteString> read(LogEntryProto entry,
TransactionContext context) {
+ return read(entry);
+ }
+
/**
* Write asynchronously the state machine data in the given log entry to
this state machine.
*
@@ -88,6 +97,15 @@ public interface StateMachine extends Closeable {
return CompletableFuture.completedFuture(null);
}
+ /**
+ * Write asynchronously the state machine data in the given log entry to
this state machine.
+ *
+ * @return a future for the write task
+ */
+ default CompletableFuture<?> write(LogEntryProto entry, TransactionContext
context) {
+ return write(entry);
+ }
+
/**
* Create asynchronously a {@link DataStream} to stream state machine data.
* The state machine may use the first message (i.e. request.getMessage())
as the header to create the stream.
@@ -483,14 +501,30 @@ public interface StateMachine extends Closeable {
* and then build a {@link TransactionContext}.
* The implementation should also be light-weighted.
*
- * @return null if the request should be rejected.
- * Otherwise, return a transaction with the content to be written to
the log.
+ * @return a transaction with the content to be written to the log.
* @throws IOException thrown by the state machine while validation
*
* @see TransactionContext.Builder
*/
TransactionContext startTransaction(RaftClientRequest request) throws
IOException;
+ /**
+ * Start a transaction for the given log entry for non-leaders.
+ * This method can be invoked in parallel when there are multiple requests.
+ * The implementation should prepare a {@link StateMachineLogEntryProto},
+ * and then build a {@link TransactionContext}.
+ * The implementation should also be light-weighted.
+ *
+ * @return a transaction with the content to be written to the log.
+ */
+ default TransactionContext startTransaction(LogEntryProto entry,
RaftPeerRole role) {
+ return TransactionContext.newBuilder()
+ .setStateMachine(this)
+ .setLogEntry(entry)
+ .setServerRole(role)
+ .build();
+ }
+
/**
* This is called before the transaction passed from the StateMachine is
appended to the raft log.
* This method is called with the same strict serial order as the
transaction order in the raft log.
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 3fb0cb2fa..6127972cb 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -135,6 +135,7 @@ import org.apache.ratis.util.IOUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.JmxRegister;
import org.apache.ratis.util.LifeCycle;
+import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.ProtoUtils;
import org.apache.ratis.util.TimeDuration;
@@ -222,6 +223,7 @@ class RaftServerImpl implements RaftServer.Division,
private final DataStreamMap dataStreamMap;
private final RaftServerConfigKeys.Read.Option readOption;
+ private final TransactionManager transactionManager = new
TransactionManager();
private final RetryCacheImpl retryCache;
private final CommitInfoCache commitInfoCache = new CommitInfoCache();
private final WriteIndexCache writeIndexCache;
@@ -1784,6 +1786,7 @@ class RaftServerImpl implements RaftServer.Division,
}
return stateMachineFuture.whenComplete((reply, exception) -> {
+ transactionManager.remove(logIndex);
final RaftClientReply.Builder b = newReplyBuilder(invocationId,
logIndex);
final RaftClientReply r;
if (exception == null) {
@@ -1801,6 +1804,27 @@ class RaftServerImpl implements RaftServer.Division,
});
}
+ TransactionContext getTransactionContext(LogEntryProto entry, Boolean
createNew) {
+ if (!entry.hasStateMachineLogEntry()) {
+ return null;
+ }
+
+ final Optional<LeaderStateImpl> leader = getRole().getLeaderState();
+ if (leader.isPresent()) {
+ final TransactionContext context =
leader.get().getTransactionContext(entry.getIndex());
+ if (context != null) {
+ return context;
+ }
+ }
+
+ if (!createNew) {
+ return transactionManager.get(entry.getIndex());
+ }
+ return transactionManager.computeIfAbsent(entry.getIndex(),
+ // call startTransaction only once
+ MemoizedSupplier.valueOf(() -> stateMachine.startTransaction(entry,
getInfo().getCurrentRole())));
+ }
+
CompletableFuture<Message> applyLogToStateMachine(LogEntryProto next) throws
RaftLogIOException {
if (!next.hasStateMachineLogEntry()) {
stateMachine.event().notifyTermIndexUpdated(next.getTerm(),
next.getIndex());
@@ -1813,14 +1837,7 @@ class RaftServerImpl implements RaftServer.Division,
stateMachine.event().notifyConfigurationChanged(next.getTerm(),
next.getIndex(), next.getConfigurationEntry());
role.getLeaderState().ifPresent(leader -> leader.checkReady(next));
} else if (next.hasStateMachineLogEntry()) {
- // check whether there is a TransactionContext because we are the leader.
- TransactionContext trx = role.getLeaderState()
- .map(leader -> leader.getTransactionContext(next.getIndex()))
- .orElseGet(() -> TransactionContext.newBuilder()
- .setServerRole(role.getCurrentRole())
- .setStateMachine(stateMachine)
- .setLogEntry(next)
- .build());
+ TransactionContext trx = getTransactionContext(next, true);
final ClientInvocationId invocationId =
ClientInvocationId.valueOf(next.getStateMachineLogEntry());
writeIndexCache.add(invocationId.getClientId(),
((TransactionContextImpl) trx).getLogIndexFuture());
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index 2f2f36d79..315d61055 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -177,11 +177,16 @@ class ServerState {
if (RaftServerConfigKeys.Log.useMemory(prop)) {
log = new MemoryRaftLog(memberId, getSnapshotIndexFromStateMachine,
prop);
} else {
- log = new SegmentedRaftLog(memberId, server,
- server.getStateMachine(),
- server::notifyTruncatedLogEntry,
- server::submitUpdateCommitEvent,
- storage, getSnapshotIndexFromStateMachine, prop);
+ log = SegmentedRaftLog.newBuilder()
+ .setMemberId(memberId)
+ .setServer(server)
+ .setNotifyTruncatedLogEntry(server::notifyTruncatedLogEntry)
+ .setGetTransactionContext(server::getTransactionContext)
+ .setSubmitUpdateCommitEvent(server::submitUpdateCommitEvent)
+ .setStorage(storage)
+ .setSnapshotIndexSupplier(getSnapshotIndexFromStateMachine)
+ .setProperties(prop)
+ .build();
}
log.open(log.getSnapshotIndex(), logConsumer);
return log;
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/TransactionManager.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/TransactionManager.java
new file mode 100644
index 000000000..aa989cf98
--- /dev/null
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/TransactionManager.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import org.apache.ratis.statemachine.TransactionContext;
+
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Supplier;
+
+/**
+ * Managing {@link TransactionContext}.
+ */
+class TransactionManager {
+ private final ConcurrentMap<Long, Supplier<TransactionContext>> contexts =
new ConcurrentHashMap<>();
+
+ TransactionContext get(long index) {
+ return
Optional.ofNullable(contexts.get(index)).map(Supplier::get).orElse(null);
+ }
+
+ TransactionContext computeIfAbsent(long index, Supplier<TransactionContext>
constructor) {
+ return contexts.computeIfAbsent(index, i -> constructor).get();
+ }
+
+ void remove(long index) {
+ contexts.remove(index);
+ }
+}
\ No newline at end of file
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
index 31865038e..708b499cd 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
@@ -185,7 +185,7 @@ public abstract class RaftLogBase implements RaftLog {
throw new StateMachineException(memberId, new RaftLogIOException(
"Log entry size " + entrySize + " exceeds the max buffer limit of
" + maxBufferSize));
}
- appendEntry(e).whenComplete((returned, t) -> {
+ appendEntry(e, operation).whenComplete((returned, t) -> {
if (t != null) {
LOG.error(name + ": Failed to write log entry " +
LogProtoUtils.toLogEntryString(e), t);
} else if (returned != nextIndex) {
@@ -343,10 +343,15 @@ public abstract class RaftLogBase implements RaftLog {
@Override
public final CompletableFuture<Long> appendEntry(LogEntryProto entry) {
- return runner.runSequentially(() -> appendEntryImpl(entry));
+ return appendEntry(entry, null);
}
- protected abstract CompletableFuture<Long> appendEntryImpl(LogEntryProto
entry);
+ @Override
+ public final CompletableFuture<Long> appendEntry(LogEntryProto entry,
TransactionContext context) {
+ return runner.runSequentially(() -> appendEntryImpl(entry, context));
+ }
+
+ protected abstract CompletableFuture<Long> appendEntryImpl(LogEntryProto
entry, TransactionContext context);
@Override
public final List<CompletableFuture<Long>> append(List<LogEntryProto>
entries) {
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
index 0eb7fb159..ebb1e27d7 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
@@ -25,6 +25,7 @@ import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.server.raftlog.RaftLogBase;
import org.apache.ratis.server.raftlog.LogEntryHeader;
import org.apache.ratis.server.storage.RaftStorageMetadata;
+import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.util.AutoCloseableLock;
import org.apache.ratis.util.Preconditions;
@@ -165,7 +166,7 @@ public class MemoryRaftLog extends RaftLogBase {
}
@Override
- protected CompletableFuture<Long> appendEntryImpl(LogEntryProto entry) {
+ protected CompletableFuture<Long> appendEntryImpl(LogEntryProto entry,
TransactionContext context) {
checkLogState();
try(AutoCloseableLock writeLock = writeLock()) {
validateLogEntry(entry);
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
index 255bec291..a729f8e2e 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
@@ -26,6 +26,7 @@ import
org.apache.ratis.server.metrics.SegmentedRaftLogMetrics;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.LogEntryHeader;
import org.apache.ratis.server.raftlog.LogProtoUtils;
+import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.raftlog.RaftLogBase;
import org.apache.ratis.server.raftlog.RaftLogIOException;
import org.apache.ratis.server.storage.RaftStorageMetadata;
@@ -34,6 +35,7 @@ import
org.apache.ratis.server.raftlog.segmented.LogSegment.LogRecord;
import
org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogCache.TruncateIndices;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.AutoCloseableLock;
import org.apache.ratis.util.AwaitToRun;
@@ -49,6 +51,7 @@ import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
+import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.LongSupplier;
@@ -80,7 +83,7 @@ import org.apache.ratis.util.UncheckedAutoCloseable;
* in segments should be no smaller than the last index of snapshot, otherwise
* we may have hole when append further log.
*/
-public class SegmentedRaftLog extends RaftLogBase {
+public final class SegmentedRaftLog extends RaftLogBase {
/**
* I/O task definitions.
*/
@@ -145,6 +148,10 @@ public class SegmentedRaftLog extends RaftLogBase {
/** Notify the server that a log entry is being truncated. */
default void notifyTruncatedLogEntry(TermIndex ti) {
}
+
+ default TransactionContext getTransactionContext(LogEntryProto entry,
boolean createNew) {
+ return null;
+ }
}
/**
@@ -152,7 +159,8 @@ public class SegmentedRaftLog extends RaftLogBase {
* Otherwise, the server is non-null, return the implementation using the
given server.
*/
private ServerLogMethods newServerLogMethods(RaftServer.Division impl,
- Consumer<LogEntryProto> notifyTruncatedLogEntry) {
+ Consumer<LogEntryProto> notifyTruncatedLogEntry,
+ BiFunction<LogEntryProto, Boolean, TransactionContext>
getTransactionContext) {
if (impl == null) {
return ServerLogMethods.DUMMY;
}
@@ -177,6 +185,11 @@ public class SegmentedRaftLog extends RaftLogBase {
LOG.error("{}: Failed to read log {}", getName(), ti, e);
}
}
+
+ @Override
+ public TransactionContext getTransactionContext(LogEntryProto entry,
boolean createNew) {
+ return getTransactionContext.apply(entry, createNew);
+ }
};
}
@@ -190,22 +203,19 @@ public class SegmentedRaftLog extends RaftLogBase {
private final boolean stateMachineCachingEnabled;
private final SegmentedRaftLogMetrics metrics;
- @SuppressWarnings("parameternumber")
- public SegmentedRaftLog(RaftGroupMemberId memberId, RaftServer.Division
server,
- StateMachine stateMachine, Consumer<LogEntryProto>
notifyTruncatedLogEntry, Runnable submitUpdateCommitEvent,
- RaftStorage storage, LongSupplier snapshotIndexSupplier, RaftProperties
properties) {
- super(memberId, snapshotIndexSupplier, properties);
- this.metrics = new SegmentedRaftLogMetrics(memberId);
-
- this.server = newServerLogMethods(server, notifyTruncatedLogEntry);
- this.storage = storage;
- this.stateMachine = stateMachine;
- segmentMaxSize =
RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize();
- this.cache = new SegmentedRaftLogCache(memberId, storage, properties,
getRaftLogMetrics());
- this.cacheEviction = new AwaitToRun(memberId + "-cacheEviction",
this::checkAndEvictCache).start();
- this.fileLogWorker = new SegmentedRaftLogWorker(memberId, stateMachine,
- submitUpdateCommitEvent, server, storage, properties,
getRaftLogMetrics());
- stateMachineCachingEnabled =
RaftServerConfigKeys.Log.StateMachineData.cachingEnabled(properties);
+ private SegmentedRaftLog(Builder b) {
+ super(b.memberId, b.snapshotIndexSupplier, b.properties);
+ this.metrics = new SegmentedRaftLogMetrics(b.memberId);
+
+ this.server = newServerLogMethods(b.server, b.notifyTruncatedLogEntry,
b.getTransactionContext);
+ this.storage = b.storage;
+ this.stateMachine = b.stateMachine;
+ this.segmentMaxSize =
RaftServerConfigKeys.Log.segmentSizeMax(b.properties).getSize();
+ this.cache = new SegmentedRaftLogCache(b.memberId, storage, b.properties,
getRaftLogMetrics());
+ this.cacheEviction = new AwaitToRun(b.memberId + "-cacheEviction",
this::checkAndEvictCache).start();
+ this.fileLogWorker = new SegmentedRaftLogWorker(b.memberId, stateMachine,
+ b.submitUpdateCommitEvent, b.server, storage, b.properties,
getRaftLogMetrics());
+ stateMachineCachingEnabled =
RaftServerConfigKeys.Log.StateMachineData.cachingEnabled(b.properties);
}
@Override
@@ -298,7 +308,7 @@ public class SegmentedRaftLog extends RaftLogBase {
try {
CompletableFuture<ByteString> future = null;
if (stateMachine != null) {
- future = stateMachine.data().read(entry).exceptionally(ex -> {
+ future = stateMachine.data().read(entry,
server.getTransactionContext(entry, false)).exceptionally(ex -> {
stateMachine.event().notifyLogFailed(ex, entry);
throw new CompletionException("Failed to read state machine data for
log entry " + entry, ex);
});
@@ -376,7 +386,7 @@ public class SegmentedRaftLog extends RaftLogBase {
}
@Override
- protected CompletableFuture<Long> appendEntryImpl(LogEntryProto entry) {
+ protected CompletableFuture<Long> appendEntryImpl(LogEntryProto entry,
TransactionContext context) {
checkLogState();
if (LOG.isTraceEnabled()) {
LOG.trace("{}: appendEntry {}", getName(),
LogProtoUtils.toLogEntryString(entry));
@@ -412,7 +422,7 @@ public class SegmentedRaftLog extends RaftLogBase {
// to statemachine first and then to the cache. Not following the order
// will leave a spurious entry in the cache.
CompletableFuture<Long> writeFuture =
- fileLogWorker.writeLogEntry(entry).getFuture();
+ fileLogWorker.writeLogEntry(entry, context).getFuture();
if (stateMachineCachingEnabled) {
// The stateMachineData will be cached inside the StateMachine itself.
cache.appendEntry(LogProtoUtils.removeStateMachineData(entry),
@@ -460,7 +470,8 @@ public class SegmentedRaftLog extends RaftLogBase {
futures = new ArrayList<>(entries.size() - index);
}
for (int i = index; i < entries.size(); i++) {
- futures.add(appendEntry(entries.get(i)));
+ final LogEntryProto entry = entries.get(i);
+ futures.add(appendEntry(entry, server.getTransactionContext(entry,
true)));
}
return futures;
}
@@ -528,4 +539,73 @@ public class SegmentedRaftLog extends RaftLogBase {
public String toLogEntryString(LogEntryProto logEntry) {
return LogProtoUtils.toLogEntryString(logEntry,
stateMachine::toStateMachineLogEntryString);
}
+
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ public static final class Builder {
+ private RaftGroupMemberId memberId;
+ private RaftServer.Division server;
+ private StateMachine stateMachine;
+ private Consumer<LogEntryProto> notifyTruncatedLogEntry;
+ private BiFunction<LogEntryProto, Boolean, TransactionContext>
getTransactionContext;
+ private Runnable submitUpdateCommitEvent;
+ private RaftStorage storage;
+ private LongSupplier snapshotIndexSupplier = () ->
RaftLog.INVALID_LOG_INDEX;
+ private RaftProperties properties;
+
+ private Builder() {}
+
+ public Builder setMemberId(RaftGroupMemberId memberId) {
+ this.memberId = memberId;
+ return this;
+ }
+
+ public Builder setServer(RaftServer.Division server) {
+ this.server = server;
+ this.stateMachine = server.getStateMachine();
+ return this;
+ }
+
+ public Builder setStateMachine(StateMachine stateMachine) {
+ this.stateMachine = stateMachine;
+ return this;
+ }
+
+ public Builder setNotifyTruncatedLogEntry(Consumer<LogEntryProto>
notifyTruncatedLogEntry) {
+ this.notifyTruncatedLogEntry = notifyTruncatedLogEntry;
+ return this;
+ }
+
+ public Builder setGetTransactionContext(
+ BiFunction<LogEntryProto, Boolean, TransactionContext>
getTransactionContext) {
+ this.getTransactionContext = getTransactionContext;
+ return this;
+ }
+
+ public Builder setSubmitUpdateCommitEvent(Runnable
submitUpdateCommitEvent) {
+ this.submitUpdateCommitEvent = submitUpdateCommitEvent;
+ return this;
+ }
+
+ public Builder setStorage(RaftStorage storage) {
+ this.storage = storage;
+ return this;
+ }
+
+ public Builder setSnapshotIndexSupplier(LongSupplier
snapshotIndexSupplier) {
+ this.snapshotIndexSupplier = snapshotIndexSupplier;
+ return this;
+ }
+
+ public Builder setProperties(RaftProperties properties) {
+ this.properties = properties;
+ return this;
+ }
+
+ public SegmentedRaftLog build() {
+ return new SegmentedRaftLog(this);
+ }
+ }
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
index 9f34f2917..18fd68012 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
@@ -37,6 +37,7 @@ import
org.apache.ratis.server.raftlog.segmented.SegmentedRaftLog.Task;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.StateMachine.DataStream;
+import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -435,8 +436,8 @@ class SegmentedRaftLogWorker {
addIOTask(new StartLogSegment(segmentToClose.getEndIndex() + 1));
}
- Task writeLogEntry(LogEntryProto entry) {
- return addIOTask(new WriteLog(entry));
+ Task writeLogEntry(LogEntryProto entry, TransactionContext context) {
+ return addIOTask(new WriteLog(entry, context));
}
Task truncate(TruncationSegments ts, long index) {
@@ -483,7 +484,7 @@ class SegmentedRaftLogWorker {
private final CompletableFuture<?> stateMachineFuture;
private final CompletableFuture<Long> combined;
- WriteLog(LogEntryProto entry) {
+ WriteLog(LogEntryProto entry, TransactionContext context) {
this.entry = LogProtoUtils.removeStateMachineData(entry);
if (this.entry == entry) {
final StateMachineLogEntryProto proto =
entry.hasStateMachineLogEntry()? entry.getStateMachineLogEntry(): null;
@@ -498,7 +499,7 @@ class SegmentedRaftLogWorker {
} else {
try {
// this.entry != entry iff the entry has state machine data
- this.stateMachineFuture = stateMachine.data().write(entry);
+ this.stateMachineFuture = stateMachine.data().write(entry, context);
} catch (Exception e) {
LOG.error(name + ": writeStateMachineData failed for index " +
entry.getIndex()
+ ", entry=" + LogProtoUtils.toLogEntryString(entry,
stateMachine::toStateMachineLogEntryString), e);
diff --git
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
index 958c19442..73482dcf8 100644
---
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
+++
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
@@ -182,10 +182,15 @@ public class RaftServerTestUtil {
final RaftServerImpl server = Mockito.mock(RaftServerImpl.class);
Mockito.when(server.getInfo()).thenReturn(info);
- return new SegmentedRaftLog(memberId, server, null,
- server::notifyTruncatedLogEntry,
- server::submitUpdateCommitEvent,
- storage, () -> -1, properties);
+ return SegmentedRaftLog.newBuilder()
+ .setMemberId(memberId)
+ .setServer(server)
+ .setNotifyTruncatedLogEntry(server::notifyTruncatedLogEntry)
+ .setGetTransactionContext(server::getTransactionContext)
+ .setSubmitUpdateCommitEvent(server::submitUpdateCommitEvent)
+ .setStorage(storage)
+ .setProperties(properties)
+ .build();
}
public static boolean isHighestPriority(RaftConfiguration config, RaftPeerId
peerId) {
diff --git
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RetryCacheTestUtil.java
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RetryCacheTestUtil.java
index 9ab814cc6..f59958a94 100644
---
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RetryCacheTestUtil.java
+++
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RetryCacheTestUtil.java
@@ -81,8 +81,14 @@ public class RetryCacheTestUtil {
when(server.getRetryCache()).thenReturn((RetryCacheImpl) retryCache);
when(server.getMemberId()).thenReturn(memberId);
doCallRealMethod().when(server).notifyTruncatedLogEntry(any(LogEntryProto.class));
- return new SegmentedRaftLog(memberId, server, null,
- server::notifyTruncatedLogEntry, server::submitUpdateCommitEvent,
- storage, () -> -1, properties);
+ return SegmentedRaftLog.newBuilder()
+ .setMemberId(memberId)
+ .setServer(server)
+ .setNotifyTruncatedLogEntry(server::notifyTruncatedLogEntry)
+ .setGetTransactionContext(server::getTransactionContext)
+ .setSubmitUpdateCommitEvent(server::submitUpdateCommitEvent)
+ .setStorage(storage)
+ .setProperties(properties)
+ .build();
}
}
diff --git
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
index 6dc75a3d3..abc36e4ee 100644
---
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
+++
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
@@ -136,14 +136,21 @@ public class TestSegmentedRaftLog extends BaseTest {
}
static SegmentedRaftLog newSegmentedRaftLog(RaftStorage storage,
RaftProperties properties) {
- return new SegmentedRaftLog(memberId, null, null, null, null, storage,
- () -> -1, properties);
+ return SegmentedRaftLog.newBuilder()
+ .setMemberId(memberId)
+ .setStorage(storage)
+ .setProperties(properties)
+ .build();
}
private SegmentedRaftLog newSegmentedRaftLogWithSnapshotIndex(RaftStorage
storage, RaftProperties properties,
LongSupplier
getSnapshotIndexFromStateMachine) {
- return new SegmentedRaftLog(memberId, null, null, null, null, storage,
- getSnapshotIndexFromStateMachine, properties);
+ return SegmentedRaftLog.newBuilder()
+ .setMemberId(memberId)
+ .setStorage(storage)
+ .setSnapshotIndexSupplier(getSnapshotIndexFromStateMachine)
+ .setProperties(properties)
+ .build();
}
@Before
@@ -576,7 +583,12 @@ public class TestSegmentedRaftLog extends BaseTest {
final List<LogEntryProto> entries = prepareLogEntries(range, null, true,
new ArrayList<>());
final SimpleStateMachine4Testing sm = new SimpleStateMachine4Testing();
- try (SegmentedRaftLog raftLog = new SegmentedRaftLog(memberId, null, sm,
null, null, storage, () -> -1, properties)) {
+ try (SegmentedRaftLog raftLog = SegmentedRaftLog.newBuilder()
+ .setMemberId(memberId)
+ .setStateMachine(sm)
+ .setStorage(storage)
+ .setProperties(properties)
+ .build()) {
raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
int next = 0;
@@ -641,7 +653,12 @@ public class TestSegmentedRaftLog extends BaseTest {
};
Throwable ex = null; // TimeoutIOException
- try (SegmentedRaftLog raftLog = new SegmentedRaftLog(memberId, null, sm,
null, null, storage, () -> -1, properties)) {
+ try (SegmentedRaftLog raftLog = SegmentedRaftLog.newBuilder()
+ .setMemberId(memberId)
+ .setStateMachine(sm)
+ .setStorage(storage)
+ .setProperties(properties)
+ .build()) {
raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
// SegmentedRaftLogWorker should catch TimeoutIOException
CompletableFuture<Long> f = raftLog.appendEntry(entry);