This is an automated email from the ASF dual-hosted git repository.

williamsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new ba8e62cd2 RATIS-1897. Make TransactionContext available in 
DataApi.write(..). (#930)
ba8e62cd2 is described below

commit ba8e62cd2f22593af22317ea7c1cc5a0b08a23b3
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Thu Oct 12 02:51:36 2023 -0700

    RATIS-1897. Make TransactionContext available in DataApi.write(..). (#930)
---
 .../apache/ratis/examples/filestore/FileStore.java |   2 +-
 .../ratis/examples/filestore/FileStoreCommon.java  |   3 +-
 .../examples/filestore/FileStoreStateMachine.java  |  71 ++++++------
 .../ratis/server/raftlog/RaftLogSequentialOps.java |   8 ++
 .../apache/ratis/statemachine/StateMachine.java    |  38 ++++++-
 .../apache/ratis/server/impl/RaftServerImpl.java   |  33 ++++--
 .../org/apache/ratis/server/impl/ServerState.java  |  15 ++-
 .../ratis/server/impl/TransactionManager.java      |  44 ++++++++
 .../apache/ratis/server/raftlog/RaftLogBase.java   |  11 +-
 .../ratis/server/raftlog/memory/MemoryRaftLog.java |   3 +-
 .../server/raftlog/segmented/SegmentedRaftLog.java | 124 +++++++++++++++++----
 .../raftlog/segmented/SegmentedRaftLogWorker.java  |   9 +-
 .../ratis/server/impl/RaftServerTestUtil.java      |  13 ++-
 .../ratis/server/impl/RetryCacheTestUtil.java      |  12 +-
 .../raftlog/segmented/TestSegmentedRaftLog.java    |  29 ++++-
 15 files changed, 322 insertions(+), 93 deletions(-)

diff --git 
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java
 
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java
index 04ff64bd8..a930170ec 100644
--- 
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java
+++ 
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java
@@ -239,7 +239,7 @@ public class FileStore implements Closeable {
       uc = files.get(relative).asUnderConstruction();
     } catch (FileNotFoundException e) {
       return FileStoreCommon.completeExceptionally(
-          index, "Failed to write to " + relative, e);
+          index, "Failed to submitCommit to " + relative, e);
     }
 
     return uc.submitCommit(offset, size, converter, committer, getId(), index)
diff --git 
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreCommon.java
 
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreCommon.java
index 152bc5a23..c96662b2a 100644
--- 
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreCommon.java
+++ 
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreCommon.java
@@ -59,7 +59,6 @@ public interface FileStoreCommon {
 
   static <T> CompletableFuture<T> completeExceptionally(
       String message, Throwable cause) {
-    return JavaUtils.completeExceptionally(
-        new IOException(message).initCause(cause));
+    return JavaUtils.completeExceptionally(new IOException(message, cause));
   }
 }
diff --git 
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
 
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
index 209baaf7d..5f258ee3b 100644
--- 
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
+++ 
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
@@ -26,8 +26,8 @@ import org.apache.ratis.proto.ExamplesProtos.ReadRequestProto;
 import org.apache.ratis.proto.ExamplesProtos.StreamWriteRequestProto;
 import org.apache.ratis.proto.ExamplesProtos.WriteRequestHeaderProto;
 import org.apache.ratis.proto.ExamplesProtos.WriteRequestProto;
+import org.apache.ratis.proto.RaftProtos;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
-import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto;
 import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftGroupId;
@@ -97,29 +97,32 @@ public class FileStoreStateMachine extends BaseStateMachine 
{
     final TransactionContext.Builder b = TransactionContext.newBuilder()
         .setStateMachine(this)
         .setClientRequest(request);
-
     if (proto.getRequestCase() == FileStoreRequestProto.RequestCase.WRITE) {
       final WriteRequestProto write = proto.getWrite();
       final FileStoreRequestProto newProto = FileStoreRequestProto.newBuilder()
           .setWriteHeader(write.getHeader()).build();
-      
b.setLogData(newProto.toByteString()).setStateMachineData(write.getData());
+      
b.setLogData(newProto.toByteString()).setStateMachineData(write.getData())
+       .setStateMachineContext(newProto);
     } else {
-      b.setLogData(content);
+      b.setLogData(content)
+       .setStateMachineContext(proto);
     }
     return b.build();
   }
 
   @Override
-  public CompletableFuture<Integer> write(LogEntryProto entry) {
-    final StateMachineLogEntryProto smLog = entry.getStateMachineLogEntry();
-    final ByteString data = smLog.getLogData();
-    final FileStoreRequestProto proto;
-    try {
-      proto = FileStoreRequestProto.parseFrom(data);
-    } catch (InvalidProtocolBufferException e) {
-      return FileStoreCommon.completeExceptionally(
-          entry.getIndex(), "Failed to parse data, entry=" + entry, e);
-    }
+  public TransactionContext startTransaction(LogEntryProto entry, 
RaftProtos.RaftPeerRole role) {
+    return TransactionContext.newBuilder()
+        .setStateMachine(this)
+        .setLogEntry(entry)
+        .setServerRole(role)
+        .setStateMachineContext(getProto(entry))
+        .build();
+  }
+
+  @Override
+  public CompletableFuture<Integer> write(LogEntryProto entry, 
TransactionContext context) {
+    final FileStoreRequestProto proto = getProto(context, entry);
     if (proto.getRequestCase() != 
FileStoreRequestProto.RequestCase.WRITEHEADER) {
       return null;
     }
@@ -127,22 +130,32 @@ public class FileStoreStateMachine extends 
BaseStateMachine {
     final WriteRequestHeaderProto h = proto.getWriteHeader();
     final CompletableFuture<Integer> f = files.write(entry.getIndex(),
         h.getPath().toStringUtf8(), h.getClose(),  h.getSync(), h.getOffset(),
-        smLog.getStateMachineEntry().getStateMachineData());
+        
entry.getStateMachineLogEntry().getStateMachineEntry().getStateMachineData());
     // sync only if closing the file
     return h.getClose()? f: null;
   }
 
-  @Override
-  public CompletableFuture<ByteString> read(LogEntryProto entry) {
-    final StateMachineLogEntryProto smLog = entry.getStateMachineLogEntry();
-    final ByteString data = smLog.getLogData();
-    final FileStoreRequestProto proto;
+  static FileStoreRequestProto getProto(TransactionContext context, 
LogEntryProto entry) {
+    if (context != null) {
+      final FileStoreRequestProto proto = (FileStoreRequestProto) 
context.getStateMachineContext();
+      if (proto != null) {
+        return proto;
+      }
+    }
+    return getProto(entry);
+  }
+
+  static FileStoreRequestProto getProto(LogEntryProto entry) {
     try {
-      proto = FileStoreRequestProto.parseFrom(data);
+      return 
FileStoreRequestProto.parseFrom(entry.getStateMachineLogEntry().getLogData());
     } catch (InvalidProtocolBufferException e) {
-      return FileStoreCommon.completeExceptionally(
-          entry.getIndex(), "Failed to parse data, entry=" + entry, e);
+      throw new IllegalArgumentException("Failed to parse data, entry=" + 
entry, e);
     }
+  }
+
+  @Override
+  public CompletableFuture<ByteString> read(LogEntryProto entry, 
TransactionContext context) {
+    final FileStoreRequestProto proto = getProto(context, entry);
     if (proto.getRequestCase() != 
FileStoreRequestProto.RequestCase.WRITEHEADER) {
       return null;
     }
@@ -206,20 +219,14 @@ public class FileStoreStateMachine extends 
BaseStateMachine {
     final long index = entry.getIndex();
     updateLastAppliedTermIndex(entry.getTerm(), index);
 
-    final StateMachineLogEntryProto smLog = entry.getStateMachineLogEntry();
-    final FileStoreRequestProto request;
-    try {
-      request = FileStoreRequestProto.parseFrom(smLog.getLogData());
-    } catch (InvalidProtocolBufferException e) {
-      return FileStoreCommon.completeExceptionally(index,
-          "Failed to parse logData in" + smLog, e);
-    }
+    final FileStoreRequestProto request = getProto(trx, entry);
 
     switch(request.getRequestCase()) {
       case DELETE:
         return delete(index, request.getDelete());
       case WRITEHEADER:
-        return writeCommit(index, request.getWriteHeader(), 
smLog.getStateMachineEntry().getStateMachineData().size());
+        return writeCommit(index, request.getWriteHeader(),
+            
entry.getStateMachineLogEntry().getStateMachineEntry().getStateMachineData().size());
       case STREAM:
         return streamCommit(request.getStream());
       case WRITE:
diff --git 
a/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLogSequentialOps.java
 
b/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLogSequentialOps.java
index 32bd564e0..7b9f42b6b 100644
--- 
a/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLogSequentialOps.java
+++ 
b/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLogSequentialOps.java
@@ -121,6 +121,14 @@ interface RaftLogSequentialOps {
    */
   CompletableFuture<Long> appendEntry(LogEntryProto entry);
 
+  /**
+   * Append asynchronously an entry.
+   * Used by the leader.
+   */
+  default CompletableFuture<Long> appendEntry(LogEntryProto entry, 
TransactionContext context) {
+    return appendEntry(entry);
+  }
+
   /**
    * The same as append(Arrays.asList(entries)).
    *
diff --git 
a/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java
 
b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java
index 90813f325..b1fc5adda 100644
--- 
a/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java
+++ 
b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java
@@ -79,6 +79,15 @@ public interface StateMachine extends Closeable {
       throw new UnsupportedOperationException("This method is NOT supported.");
     }
 
+    /**
+     * Read asynchronously the state machine data from this state machine.
+     *
+     * @return a future for the read task.
+     */
+    default CompletableFuture<ByteString> read(LogEntryProto entry, 
TransactionContext context) {
+      return read(entry);
+    }
+
     /**
      * Write asynchronously the state machine data in the given log entry to 
this state machine.
      *
@@ -88,6 +97,15 @@ public interface StateMachine extends Closeable {
       return CompletableFuture.completedFuture(null);
     }
 
+    /**
+     * Write asynchronously the state machine data in the given log entry to 
this state machine.
+     *
+     * @return a future for the write task
+     */
+    default CompletableFuture<?> write(LogEntryProto entry, TransactionContext 
context) {
+      return write(entry);
+    }
+
     /**
      * Create asynchronously a {@link DataStream} to stream state machine data.
      * The state machine may use the first message (i.e. request.getMessage()) 
as the header to create the stream.
@@ -483,14 +501,30 @@ public interface StateMachine extends Closeable {
    * and then build a {@link TransactionContext}.
    * The implementation should also be light-weighted.
    *
-   * @return null if the request should be rejected.
-   *         Otherwise, return a transaction with the content to be written to 
the log.
+   * @return a transaction with the content to be written to the log.
    * @throws IOException thrown by the state machine while validation
    *
    * @see TransactionContext.Builder
    */
   TransactionContext startTransaction(RaftClientRequest request) throws 
IOException;
 
+  /**
+   * Start a transaction for the given log entry for non-leaders.
+   * This method can be invoked in parallel when there are multiple requests.
+   * The implementation should prepare a {@link StateMachineLogEntryProto},
+   * and then build a {@link TransactionContext}.
+   * The implementation should also be light-weighted.
+   *
+   * @return a transaction with the content to be written to the log.
+   */
+  default TransactionContext startTransaction(LogEntryProto entry, 
RaftPeerRole role) {
+    return TransactionContext.newBuilder()
+        .setStateMachine(this)
+        .setLogEntry(entry)
+        .setServerRole(role)
+        .build();
+  }
+
   /**
    * This is called before the transaction passed from the StateMachine is 
appended to the raft log.
    * This method is called with the same strict serial order as the 
transaction order in the raft log.
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 3fb0cb2fa..6127972cb 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -135,6 +135,7 @@ import org.apache.ratis.util.IOUtils;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.JmxRegister;
 import org.apache.ratis.util.LifeCycle;
+import org.apache.ratis.util.MemoizedSupplier;
 import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.ProtoUtils;
 import org.apache.ratis.util.TimeDuration;
@@ -222,6 +223,7 @@ class RaftServerImpl implements RaftServer.Division,
   private final DataStreamMap dataStreamMap;
   private final RaftServerConfigKeys.Read.Option readOption;
 
+  private final TransactionManager transactionManager = new 
TransactionManager();
   private final RetryCacheImpl retryCache;
   private final CommitInfoCache commitInfoCache = new CommitInfoCache();
   private final WriteIndexCache writeIndexCache;
@@ -1784,6 +1786,7 @@ class RaftServerImpl implements RaftServer.Division,
     }
 
     return stateMachineFuture.whenComplete((reply, exception) -> {
+      transactionManager.remove(logIndex);
       final RaftClientReply.Builder b = newReplyBuilder(invocationId, 
logIndex);
       final RaftClientReply r;
       if (exception == null) {
@@ -1801,6 +1804,27 @@ class RaftServerImpl implements RaftServer.Division,
     });
   }
 
+  TransactionContext getTransactionContext(LogEntryProto entry, Boolean 
createNew) {
+    if (!entry.hasStateMachineLogEntry()) {
+      return null;
+    }
+
+    final Optional<LeaderStateImpl> leader = getRole().getLeaderState();
+    if (leader.isPresent()) {
+      final TransactionContext context = 
leader.get().getTransactionContext(entry.getIndex());
+      if (context != null) {
+        return context;
+      }
+    }
+
+    if (!createNew) {
+      return transactionManager.get(entry.getIndex());
+    }
+    return transactionManager.computeIfAbsent(entry.getIndex(),
+        // call startTransaction only once
+        MemoizedSupplier.valueOf(() -> stateMachine.startTransaction(entry, 
getInfo().getCurrentRole())));
+  }
+
   CompletableFuture<Message> applyLogToStateMachine(LogEntryProto next) throws 
RaftLogIOException {
     if (!next.hasStateMachineLogEntry()) {
       stateMachine.event().notifyTermIndexUpdated(next.getTerm(), 
next.getIndex());
@@ -1813,14 +1837,7 @@ class RaftServerImpl implements RaftServer.Division,
       stateMachine.event().notifyConfigurationChanged(next.getTerm(), 
next.getIndex(), next.getConfigurationEntry());
       role.getLeaderState().ifPresent(leader -> leader.checkReady(next));
     } else if (next.hasStateMachineLogEntry()) {
-      // check whether there is a TransactionContext because we are the leader.
-      TransactionContext trx = role.getLeaderState()
-          .map(leader -> leader.getTransactionContext(next.getIndex()))
-          .orElseGet(() -> TransactionContext.newBuilder()
-                  .setServerRole(role.getCurrentRole())
-                  .setStateMachine(stateMachine)
-                  .setLogEntry(next)
-                  .build());
+      TransactionContext trx = getTransactionContext(next, true);
       final ClientInvocationId invocationId = 
ClientInvocationId.valueOf(next.getStateMachineLogEntry());
       writeIndexCache.add(invocationId.getClientId(), 
((TransactionContextImpl) trx).getLogIndexFuture());
 
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index 2f2f36d79..315d61055 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -177,11 +177,16 @@ class ServerState {
     if (RaftServerConfigKeys.Log.useMemory(prop)) {
       log = new MemoryRaftLog(memberId, getSnapshotIndexFromStateMachine, 
prop);
     } else {
-      log = new SegmentedRaftLog(memberId, server,
-          server.getStateMachine(),
-          server::notifyTruncatedLogEntry,
-          server::submitUpdateCommitEvent,
-          storage, getSnapshotIndexFromStateMachine, prop);
+      log = SegmentedRaftLog.newBuilder()
+          .setMemberId(memberId)
+          .setServer(server)
+          .setNotifyTruncatedLogEntry(server::notifyTruncatedLogEntry)
+          .setGetTransactionContext(server::getTransactionContext)
+          .setSubmitUpdateCommitEvent(server::submitUpdateCommitEvent)
+          .setStorage(storage)
+          .setSnapshotIndexSupplier(getSnapshotIndexFromStateMachine)
+          .setProperties(prop)
+          .build();
     }
     log.open(log.getSnapshotIndex(), logConsumer);
     return log;
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/TransactionManager.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/TransactionManager.java
new file mode 100644
index 000000000..aa989cf98
--- /dev/null
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/TransactionManager.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import org.apache.ratis.statemachine.TransactionContext;
+
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Supplier;
+
+/**
+ * Managing {@link TransactionContext}.
+ */
+class TransactionManager {
+  private final ConcurrentMap<Long, Supplier<TransactionContext>> contexts = 
new ConcurrentHashMap<>();
+
+  TransactionContext get(long index) {
+    return 
Optional.ofNullable(contexts.get(index)).map(Supplier::get).orElse(null);
+  }
+
+  TransactionContext computeIfAbsent(long index, Supplier<TransactionContext> 
constructor) {
+    return contexts.computeIfAbsent(index, i -> constructor).get();
+  }
+
+  void remove(long index) {
+    contexts.remove(index);
+  }
+}
\ No newline at end of file
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
index 31865038e..708b499cd 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
@@ -185,7 +185,7 @@ public abstract class RaftLogBase implements RaftLog {
         throw new StateMachineException(memberId, new RaftLogIOException(
             "Log entry size " + entrySize + " exceeds the max buffer limit of 
" + maxBufferSize));
       }
-      appendEntry(e).whenComplete((returned, t) -> {
+      appendEntry(e, operation).whenComplete((returned, t) -> {
         if (t != null) {
           LOG.error(name + ": Failed to write log entry " + 
LogProtoUtils.toLogEntryString(e), t);
         } else if (returned != nextIndex) {
@@ -343,10 +343,15 @@ public abstract class RaftLogBase implements RaftLog {
 
   @Override
   public final CompletableFuture<Long> appendEntry(LogEntryProto entry) {
-    return runner.runSequentially(() -> appendEntryImpl(entry));
+    return appendEntry(entry, null);
   }
 
-  protected abstract CompletableFuture<Long> appendEntryImpl(LogEntryProto 
entry);
+  @Override
+  public final CompletableFuture<Long> appendEntry(LogEntryProto entry, 
TransactionContext context) {
+    return runner.runSequentially(() -> appendEntryImpl(entry, context));
+  }
+
+  protected abstract CompletableFuture<Long> appendEntryImpl(LogEntryProto 
entry, TransactionContext context);
 
   @Override
   public final List<CompletableFuture<Long>> append(List<LogEntryProto> 
entries) {
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
index 0eb7fb159..ebb1e27d7 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
@@ -25,6 +25,7 @@ import org.apache.ratis.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.server.raftlog.RaftLogBase;
 import org.apache.ratis.server.raftlog.LogEntryHeader;
 import org.apache.ratis.server.storage.RaftStorageMetadata;
+import org.apache.ratis.statemachine.TransactionContext;
 import org.apache.ratis.util.AutoCloseableLock;
 import org.apache.ratis.util.Preconditions;
 
@@ -165,7 +166,7 @@ public class MemoryRaftLog extends RaftLogBase {
   }
 
   @Override
-  protected CompletableFuture<Long> appendEntryImpl(LogEntryProto entry) {
+  protected CompletableFuture<Long> appendEntryImpl(LogEntryProto entry, 
TransactionContext context) {
     checkLogState();
     try(AutoCloseableLock writeLock = writeLock()) {
       validateLogEntry(entry);
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
index 255bec291..a729f8e2e 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
@@ -26,6 +26,7 @@ import 
org.apache.ratis.server.metrics.SegmentedRaftLogMetrics;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.raftlog.LogEntryHeader;
 import org.apache.ratis.server.raftlog.LogProtoUtils;
+import org.apache.ratis.server.raftlog.RaftLog;
 import org.apache.ratis.server.raftlog.RaftLogBase;
 import org.apache.ratis.server.raftlog.RaftLogIOException;
 import org.apache.ratis.server.storage.RaftStorageMetadata;
@@ -34,6 +35,7 @@ import 
org.apache.ratis.server.raftlog.segmented.LogSegment.LogRecord;
 import 
org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogCache.TruncateIndices;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.statemachine.TransactionContext;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.ratis.util.AutoCloseableLock;
 import org.apache.ratis.util.AwaitToRun;
@@ -49,6 +51,7 @@ import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
+import java.util.function.BiFunction;
 import java.util.function.Consumer;
 import java.util.function.LongSupplier;
 
@@ -80,7 +83,7 @@ import org.apache.ratis.util.UncheckedAutoCloseable;
  * in segments should be no smaller than the last index of snapshot, otherwise
  * we may have hole when append further log.
  */
-public class SegmentedRaftLog extends RaftLogBase {
+public final class SegmentedRaftLog extends RaftLogBase {
   /**
    * I/O task definitions.
    */
@@ -145,6 +148,10 @@ public class SegmentedRaftLog extends RaftLogBase {
     /** Notify the server that a log entry is being truncated. */
     default void notifyTruncatedLogEntry(TermIndex ti) {
     }
+
+    default TransactionContext getTransactionContext(LogEntryProto entry, 
boolean createNew) {
+      return null;
+    }
   }
 
   /**
@@ -152,7 +159,8 @@ public class SegmentedRaftLog extends RaftLogBase {
    * Otherwise, the server is non-null, return the implementation using the 
given server.
    */
   private ServerLogMethods newServerLogMethods(RaftServer.Division impl,
-      Consumer<LogEntryProto> notifyTruncatedLogEntry) {
+      Consumer<LogEntryProto> notifyTruncatedLogEntry,
+      BiFunction<LogEntryProto, Boolean, TransactionContext> 
getTransactionContext) {
     if (impl == null) {
       return ServerLogMethods.DUMMY;
     }
@@ -177,6 +185,11 @@ public class SegmentedRaftLog extends RaftLogBase {
           LOG.error("{}: Failed to read log {}", getName(), ti, e);
         }
       }
+
+      @Override
+      public TransactionContext getTransactionContext(LogEntryProto entry, 
boolean createNew) {
+        return getTransactionContext.apply(entry, createNew);
+      }
     };
   }
 
@@ -190,22 +203,19 @@ public class SegmentedRaftLog extends RaftLogBase {
   private final boolean stateMachineCachingEnabled;
   private final SegmentedRaftLogMetrics metrics;
 
-  @SuppressWarnings("parameternumber")
-  public SegmentedRaftLog(RaftGroupMemberId memberId, RaftServer.Division 
server,
-      StateMachine stateMachine, Consumer<LogEntryProto> 
notifyTruncatedLogEntry, Runnable submitUpdateCommitEvent,
-      RaftStorage storage, LongSupplier snapshotIndexSupplier, RaftProperties 
properties) {
-    super(memberId, snapshotIndexSupplier, properties);
-    this.metrics = new SegmentedRaftLogMetrics(memberId);
-
-    this.server = newServerLogMethods(server, notifyTruncatedLogEntry);
-    this.storage = storage;
-    this.stateMachine = stateMachine;
-    segmentMaxSize = 
RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize();
-    this.cache = new SegmentedRaftLogCache(memberId, storage, properties, 
getRaftLogMetrics());
-    this.cacheEviction = new AwaitToRun(memberId + "-cacheEviction", 
this::checkAndEvictCache).start();
-    this.fileLogWorker = new SegmentedRaftLogWorker(memberId, stateMachine,
-        submitUpdateCommitEvent, server, storage, properties, 
getRaftLogMetrics());
-    stateMachineCachingEnabled = 
RaftServerConfigKeys.Log.StateMachineData.cachingEnabled(properties);
+  private SegmentedRaftLog(Builder b) {
+    super(b.memberId, b.snapshotIndexSupplier, b.properties);
+    this.metrics = new SegmentedRaftLogMetrics(b.memberId);
+
+    this.server = newServerLogMethods(b.server, b.notifyTruncatedLogEntry, 
b.getTransactionContext);
+    this.storage = b.storage;
+    this.stateMachine = b.stateMachine;
+    this.segmentMaxSize = 
RaftServerConfigKeys.Log.segmentSizeMax(b.properties).getSize();
+    this.cache = new SegmentedRaftLogCache(b.memberId, storage, b.properties, 
getRaftLogMetrics());
+    this.cacheEviction = new AwaitToRun(b.memberId + "-cacheEviction", 
this::checkAndEvictCache).start();
+    this.fileLogWorker = new SegmentedRaftLogWorker(b.memberId, stateMachine,
+        b.submitUpdateCommitEvent, b.server, storage, b.properties, 
getRaftLogMetrics());
+    stateMachineCachingEnabled = 
RaftServerConfigKeys.Log.StateMachineData.cachingEnabled(b.properties);
   }
 
   @Override
@@ -298,7 +308,7 @@ public class SegmentedRaftLog extends RaftLogBase {
     try {
       CompletableFuture<ByteString> future = null;
       if (stateMachine != null) {
-        future = stateMachine.data().read(entry).exceptionally(ex -> {
+        future = stateMachine.data().read(entry, 
server.getTransactionContext(entry, false)).exceptionally(ex -> {
           stateMachine.event().notifyLogFailed(ex, entry);
           throw new CompletionException("Failed to read state machine data for 
log entry " + entry, ex);
         });
@@ -376,7 +386,7 @@ public class SegmentedRaftLog extends RaftLogBase {
   }
 
   @Override
-  protected CompletableFuture<Long> appendEntryImpl(LogEntryProto entry) {
+  protected CompletableFuture<Long> appendEntryImpl(LogEntryProto entry, 
TransactionContext context) {
     checkLogState();
     if (LOG.isTraceEnabled()) {
       LOG.trace("{}: appendEntry {}", getName(), 
LogProtoUtils.toLogEntryString(entry));
@@ -412,7 +422,7 @@ public class SegmentedRaftLog extends RaftLogBase {
       // to statemachine first and then to the cache. Not following the order
       // will leave a spurious entry in the cache.
       CompletableFuture<Long> writeFuture =
-          fileLogWorker.writeLogEntry(entry).getFuture();
+          fileLogWorker.writeLogEntry(entry, context).getFuture();
       if (stateMachineCachingEnabled) {
         // The stateMachineData will be cached inside the StateMachine itself.
         cache.appendEntry(LogProtoUtils.removeStateMachineData(entry),
@@ -460,7 +470,8 @@ public class SegmentedRaftLog extends RaftLogBase {
         futures = new ArrayList<>(entries.size() - index);
       }
       for (int i = index; i < entries.size(); i++) {
-        futures.add(appendEntry(entries.get(i)));
+        final LogEntryProto entry = entries.get(i);
+        futures.add(appendEntry(entry, server.getTransactionContext(entry, 
true)));
       }
       return futures;
     }
@@ -528,4 +539,73 @@ public class SegmentedRaftLog extends RaftLogBase {
   public String toLogEntryString(LogEntryProto logEntry) {
     return LogProtoUtils.toLogEntryString(logEntry, 
stateMachine::toStateMachineLogEntryString);
   }
+
+  public static Builder newBuilder() {
+    return new Builder();
+  }
+
+  public static final class Builder {
+    private RaftGroupMemberId memberId;
+    private RaftServer.Division server;
+    private StateMachine stateMachine;
+    private Consumer<LogEntryProto> notifyTruncatedLogEntry;
+    private BiFunction<LogEntryProto, Boolean, TransactionContext> 
getTransactionContext;
+    private Runnable submitUpdateCommitEvent;
+    private RaftStorage storage;
+    private LongSupplier snapshotIndexSupplier = () -> 
RaftLog.INVALID_LOG_INDEX;
+    private RaftProperties properties;
+
+    private Builder() {}
+
+    public Builder setMemberId(RaftGroupMemberId memberId) {
+      this.memberId = memberId;
+      return this;
+    }
+
+    public Builder setServer(RaftServer.Division server) {
+      this.server = server;
+      this.stateMachine = server.getStateMachine();
+      return this;
+    }
+
+    public Builder setStateMachine(StateMachine stateMachine) {
+      this.stateMachine = stateMachine;
+      return this;
+    }
+
+    public Builder setNotifyTruncatedLogEntry(Consumer<LogEntryProto> 
notifyTruncatedLogEntry) {
+      this.notifyTruncatedLogEntry = notifyTruncatedLogEntry;
+      return this;
+    }
+
+    public Builder setGetTransactionContext(
+        BiFunction<LogEntryProto, Boolean, TransactionContext> 
getTransactionContext) {
+      this.getTransactionContext = getTransactionContext;
+      return this;
+    }
+
+    public Builder setSubmitUpdateCommitEvent(Runnable 
submitUpdateCommitEvent) {
+      this.submitUpdateCommitEvent = submitUpdateCommitEvent;
+      return this;
+    }
+
+    public Builder setStorage(RaftStorage storage) {
+      this.storage = storage;
+      return this;
+    }
+
+    public Builder setSnapshotIndexSupplier(LongSupplier 
snapshotIndexSupplier) {
+      this.snapshotIndexSupplier = snapshotIndexSupplier;
+      return this;
+    }
+
+    public Builder setProperties(RaftProperties properties) {
+      this.properties = properties;
+      return this;
+    }
+
+    public SegmentedRaftLog build() {
+      return new SegmentedRaftLog(this);
+    }
+  }
 }
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
index 9f34f2917..18fd68012 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
@@ -37,6 +37,7 @@ import 
org.apache.ratis.server.raftlog.segmented.SegmentedRaftLog.Task;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.statemachine.StateMachine.DataStream;
+import org.apache.ratis.statemachine.TransactionContext;
 import org.apache.ratis.util.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -435,8 +436,8 @@ class SegmentedRaftLogWorker {
     addIOTask(new StartLogSegment(segmentToClose.getEndIndex() + 1));
   }
 
-  Task writeLogEntry(LogEntryProto entry) {
-    return addIOTask(new WriteLog(entry));
+  Task writeLogEntry(LogEntryProto entry, TransactionContext context) {
+    return addIOTask(new WriteLog(entry, context));
   }
 
   Task truncate(TruncationSegments ts, long index) {
@@ -483,7 +484,7 @@ class SegmentedRaftLogWorker {
     private final CompletableFuture<?> stateMachineFuture;
     private final CompletableFuture<Long> combined;
 
-    WriteLog(LogEntryProto entry) {
+    WriteLog(LogEntryProto entry, TransactionContext context) {
       this.entry = LogProtoUtils.removeStateMachineData(entry);
       if (this.entry == entry) {
         final StateMachineLogEntryProto proto = 
entry.hasStateMachineLogEntry()? entry.getStateMachineLogEntry(): null;
@@ -498,7 +499,7 @@ class SegmentedRaftLogWorker {
       } else {
         try {
           // this.entry != entry iff the entry has state machine data
-          this.stateMachineFuture = stateMachine.data().write(entry);
+          this.stateMachineFuture = stateMachine.data().write(entry, context);
         } catch (Exception e) {
           LOG.error(name + ": writeStateMachineData failed for index " + 
entry.getIndex()
               + ", entry=" + LogProtoUtils.toLogEntryString(entry, 
stateMachine::toStateMachineLogEntryString), e);
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
index 958c19442..73482dcf8 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
@@ -182,10 +182,15 @@ public class RaftServerTestUtil {
     final RaftServerImpl server = Mockito.mock(RaftServerImpl.class);
     Mockito.when(server.getInfo()).thenReturn(info);
 
-    return new SegmentedRaftLog(memberId, server, null,
-        server::notifyTruncatedLogEntry,
-        server::submitUpdateCommitEvent,
-        storage, () -> -1, properties);
+    return SegmentedRaftLog.newBuilder()
+        .setMemberId(memberId)
+        .setServer(server)
+        .setNotifyTruncatedLogEntry(server::notifyTruncatedLogEntry)
+        .setGetTransactionContext(server::getTransactionContext)
+        .setSubmitUpdateCommitEvent(server::submitUpdateCommitEvent)
+        .setStorage(storage)
+        .setProperties(properties)
+        .build();
   }
 
   public static boolean isHighestPriority(RaftConfiguration config, RaftPeerId 
peerId) {
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RetryCacheTestUtil.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RetryCacheTestUtil.java
index 9ab814cc6..f59958a94 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RetryCacheTestUtil.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RetryCacheTestUtil.java
@@ -81,8 +81,14 @@ public class RetryCacheTestUtil {
     when(server.getRetryCache()).thenReturn((RetryCacheImpl) retryCache);
     when(server.getMemberId()).thenReturn(memberId);
     
doCallRealMethod().when(server).notifyTruncatedLogEntry(any(LogEntryProto.class));
-    return new SegmentedRaftLog(memberId, server, null,
-        server::notifyTruncatedLogEntry, server::submitUpdateCommitEvent,
-        storage, () -> -1, properties);
+    return SegmentedRaftLog.newBuilder()
+        .setMemberId(memberId)
+        .setServer(server)
+        .setNotifyTruncatedLogEntry(server::notifyTruncatedLogEntry)
+        .setGetTransactionContext(server::getTransactionContext)
+        .setSubmitUpdateCommitEvent(server::submitUpdateCommitEvent)
+        .setStorage(storage)
+        .setProperties(properties)
+        .build();
   }
 }
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
 
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
index 6dc75a3d3..abc36e4ee 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
@@ -136,14 +136,21 @@ public class TestSegmentedRaftLog extends BaseTest {
   }
 
   static SegmentedRaftLog newSegmentedRaftLog(RaftStorage storage, 
RaftProperties properties) {
-    return new SegmentedRaftLog(memberId, null, null, null, null, storage,
-        () -> -1, properties);
+    return SegmentedRaftLog.newBuilder()
+        .setMemberId(memberId)
+        .setStorage(storage)
+        .setProperties(properties)
+        .build();
   }
 
   private SegmentedRaftLog newSegmentedRaftLogWithSnapshotIndex(RaftStorage 
storage, RaftProperties properties,
                                                                 LongSupplier 
getSnapshotIndexFromStateMachine) {
-    return new SegmentedRaftLog(memberId, null, null, null, null, storage,
-        getSnapshotIndexFromStateMachine, properties);
+    return SegmentedRaftLog.newBuilder()
+        .setMemberId(memberId)
+        .setStorage(storage)
+        .setSnapshotIndexSupplier(getSnapshotIndexFromStateMachine)
+        .setProperties(properties)
+        .build();
   }
 
   @Before
@@ -576,7 +583,12 @@ public class TestSegmentedRaftLog extends BaseTest {
     final List<LogEntryProto> entries = prepareLogEntries(range, null, true, 
new ArrayList<>());
 
     final SimpleStateMachine4Testing sm = new SimpleStateMachine4Testing();
-    try (SegmentedRaftLog raftLog = new SegmentedRaftLog(memberId, null, sm, 
null, null, storage, () -> -1, properties)) {
+    try (SegmentedRaftLog raftLog = SegmentedRaftLog.newBuilder()
+        .setMemberId(memberId)
+        .setStateMachine(sm)
+        .setStorage(storage)
+        .setProperties(properties)
+        .build()) {
       raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
 
       int next = 0;
@@ -641,7 +653,12 @@ public class TestSegmentedRaftLog extends BaseTest {
     };
 
     Throwable ex = null; // TimeoutIOException
-    try (SegmentedRaftLog raftLog = new SegmentedRaftLog(memberId, null, sm, 
null, null, storage, () -> -1, properties)) {
+    try (SegmentedRaftLog raftLog = SegmentedRaftLog.newBuilder()
+        .setMemberId(memberId)
+        .setStateMachine(sm)
+        .setStorage(storage)
+        .setProperties(properties)
+        .build()) {
       raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
       // SegmentedRaftLogWorker should catch TimeoutIOException
       CompletableFuture<Long> f = raftLog.appendEntry(entry);


Reply via email to