Repository: incubator-ratis Updated Branches: refs/heads/master a82ced5dd -> 60d0bc163
RATIS-353. Refactor LogEntryProto. Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/60d0bc16 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/60d0bc16 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/60d0bc16 Branch: refs/heads/master Commit: 60d0bc163182a52b5e0cb380d33e706a35e13d30 Parents: a82ced5 Author: Tsz Wo Nicholas Sze <[email protected]> Authored: Sun Oct 21 15:32:01 2018 +0800 Committer: Tsz Wo Nicholas Sze <[email protected]> Committed: Sun Oct 21 15:32:01 2018 +0800 ---------------------------------------------------------------------- .../filestore/FileStoreStateMachine.java | 8 +- ratis-proto/src/main/proto/Raft.proto | 29 +++-- .../apache/ratis/server/impl/LeaderState.java | 7 +- .../apache/ratis/server/impl/LogAppender.java | 2 +- .../ratis/server/impl/RaftServerImpl.java | 20 ++-- .../ratis/server/impl/ServerProtoUtils.java | 118 +++++++++++-------- .../apache/ratis/server/impl/ServerState.java | 5 +- .../apache/ratis/server/storage/RaftLog.java | 11 +- .../impl/TransactionContextImpl.java | 2 +- .../java/org/apache/ratis/RaftTestUtil.java | 12 +- .../ratis/server/impl/RetryCacheTestUtil.java | 18 +-- .../ratis/server/storage/TestCacheEviction.java | 6 +- .../ratis/server/storage/TestRaftLogCache.java | 12 +- .../server/storage/TestRaftLogReadWrite.java | 19 +-- .../server/storage/TestRaftLogSegment.java | 26 ++-- .../server/storage/TestSegmentedRaftLog.java | 9 +- .../SimpleStateMachine4Testing.java | 2 +- 17 files changed, 156 insertions(+), 150 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/60d0bc16/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java ---------------------------------------------------------------------- diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java index b04437b..04bed04 100644 --- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java +++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java @@ -100,9 +100,9 @@ public class FileStoreStateMachine extends BaseStateMachine { final WriteRequestProto write = proto.getWrite(); final FileStoreRequestProto newProto = FileStoreRequestProto.newBuilder() .setWriteHeader(write.getHeader()).build(); - log = ServerProtoUtils.toStateMachineLogEntryProto(newProto.toByteString(), write.getData()); + log = ServerProtoUtils.toStateMachineLogEntryProto(request, newProto.toByteString(), write.getData()); } else { - log = ServerProtoUtils.toStateMachineLogEntryProto(content, null); + log = ServerProtoUtils.toStateMachineLogEntryProto(request, content, null); } return new TransactionContextImpl(this, request, log); @@ -125,7 +125,7 @@ public class FileStoreStateMachine extends BaseStateMachine { final WriteRequestHeaderProto h = proto.getWriteHeader(); final CompletableFuture<Integer> f = files.write(entry.getIndex(), - h.getPath().toStringUtf8(), h.getClose(), h.getOffset(), smLog.getStateMachineData()); + h.getPath().toStringUtf8(), h.getClose(), h.getOffset(), smLog.getStateMachineEntry().getStateMachineData()); // sync only if closing the file return h.getClose()? f: null; } @@ -150,7 +150,7 @@ public class FileStoreStateMachine extends BaseStateMachine { case DELETE: return delete(index, request.getDelete()); case WRITEHEADER: - return writeCommit(index, request.getWriteHeader(), smLog.getStateMachineData().size()); + return writeCommit(index, request.getWriteHeader(), smLog.getStateMachineEntry().getStateMachineData().size()); case WRITE: // WRITE should not happen here since // startTransaction converts WRITE requests to WRITEHEADER requests. http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/60d0bc16/ratis-proto/src/main/proto/Raft.proto ---------------------------------------------------------------------- diff --git a/ratis-proto/src/main/proto/Raft.proto b/ratis-proto/src/main/proto/Raft.proto index 0640954..fbebe9f 100644 --- a/ratis-proto/src/main/proto/Raft.proto +++ b/ratis-proto/src/main/proto/Raft.proto @@ -40,14 +40,32 @@ message RaftConfigurationProto { repeated RaftPeerProto oldPeers = 2; // the peers in the old conf } +message StateMachineEntryProto { + /** + * StateMachine specific data which is not written to log. + * Unlike logEntryData, stateMachineData is managed and stored by the StateMachine but not the RaftLog. + */ + bytes stateMachineData = 1; + /** + * When stateMachineData is missing, it is the size of the serialized LogEntryProto along with stateMachineData. + * When stateMachineData is not missing, it must be set to zero. + */ + uint32 logEntryProtoSerializedSize = 2; +} + message StateMachineLogEntryProto { // TODO: This is not super efficient if the SM itself uses PB to serialize its own data for a /** RaftLog entry data */ bytes logData = 1; + /** + * StateMachine entry. + * StateMachine implementation may use this field to separate StateMachine specific data from the RaftLog data. + */ + StateMachineEntryProto stateMachineEntry = 2; - bytes stateMachineData = 2; // State machine specific data which is not written to log. - bool stateMachineDataAttached = 3; // set this flag when state machine data is attached. - uint64 serializedProtobufSize = 4; // size of the serialized LogEntryProto along with stateMachineData + // clientId and callId are used to rebuild the retry cache. + bytes clientId = 14; + uint64 callId = 15; } message LogEntryProto { @@ -58,11 +76,6 @@ message LogEntryProto { StateMachineLogEntryProto stateMachineLogEntry = 3; RaftConfigurationProto configurationEntry = 4; } - - // clientId and callId are used to rebuild the retry cache. They're not - // necessary for configuration change since re-conf is idempotent. - bytes clientId = 6; - uint64 callId = 7; } message TermIndexProto { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/60d0bc16/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java index 41cce99..c4c4fe2 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java @@ -216,11 +216,8 @@ public class LeaderState { // In the beginning of the new term, replicate a conf entry in order // to finally commit entries in the previous term. // Also this message can help identify the last committed index and the conf. - final LogEntryProto placeHolder = LogEntryProto.newBuilder() - .setTerm(server.getState().getCurrentTerm()) - .setIndex(raftLog.getNextIndex()) - .setConfigurationEntry(ServerProtoUtils.toRaftConfigurationProto(server.getRaftConf())) - .build(); + final LogEntryProto placeHolder = ServerProtoUtils.toLogEntryProto( + server.getRaftConf(), server.getState().getCurrentTerm(), raftLog.getNextIndex()); CodeInjectionForTesting.execute(APPEND_PLACEHOLDER, server.getId().toString(), null); raftLog.append(placeHolder); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/60d0bc16/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java index 529bd7d..0542f8b 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java @@ -143,7 +143,7 @@ public class LogAppender { * otherwise, the entry is not added, return false. */ boolean addEntry(EntryWithData entry) { - final long entrySize = entry.getSerializedSize(); + final int entrySize = entry.getSerializedSize(); if (totalSize + entrySize <= maxBufferSize) { buf.add(entry); totalSize += entrySize; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/60d0bc16/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 4f767ff..896e550 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -489,8 +489,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou final LeaderState leaderState = role.getLeaderStateNonNull(); final long entryIndex; try { - entryIndex = state.applyLog(context, request.getClientId(), - request.getCallId()); + entryIndex = state.applyLog(context); } catch (StateMachineException e) { // the StateMachineException is thrown by the SM in the preAppend stage. // Return the exception in a RaftClientReply. @@ -1041,12 +1040,13 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou */ private CompletableFuture<Message> replyPendingRequest( LogEntryProto logEntry, CompletableFuture<Message> stateMachineFuture) { + Preconditions.assertTrue(logEntry.hasStateMachineLogEntry()); + final StateMachineLogEntryProto smLog = logEntry.getStateMachineLogEntry(); // update the retry cache - final ClientId clientId = ClientId.valueOf(logEntry.getClientId()); - final long callId = logEntry.getCallId(); + final ClientId clientId = ClientId.valueOf(smLog.getClientId()); + final long callId = smLog.getCallId(); final RaftPeerId serverId = getId(); - final RetryCache.CacheEntry cacheEntry = retryCache.getOrCreateEntry( - clientId, logEntry.getCallId()); + final RetryCache.CacheEntry cacheEntry = retryCache.getOrCreateEntry(clientId, callId); if (cacheEntry.isFailed()) { retryCache.refreshEntry(new RetryCache.CacheEntry(cacheEntry.getKey())); } @@ -1117,11 +1117,13 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou public void failClientRequest(LogEntryProto logEntry) { if (logEntry.hasStateMachineLogEntry()) { - final ClientId clientId = ClientId.valueOf(logEntry.getClientId()); - final RetryCache.CacheEntry cacheEntry = getRetryCache().get(clientId, logEntry.getCallId()); + final StateMachineLogEntryProto smLog = logEntry.getStateMachineLogEntry(); + final ClientId clientId = ClientId.valueOf(smLog.getClientId()); + final long callId = smLog.getCallId(); + final RetryCache.CacheEntry cacheEntry = getRetryCache().get(clientId, callId); if (cacheEntry != null) { final RaftClientReply reply = new RaftClientReply(clientId, getId(), getGroupId(), - logEntry.getCallId(), false, null, generateNotLeaderException(), + callId, false, null, generateNotLeaderException(), logEntry.getIndex(), getCommitInfos()); cacheEntry.failWithReply(reply); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/60d0bc16/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java index 606cd2b..938cbdf 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java @@ -21,6 +21,7 @@ import org.apache.ratis.client.impl.ClientProtoUtils; import org.apache.ratis.proto.RaftProtos.*; import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult; import org.apache.ratis.protocol.ClientId; +import org.apache.ratis.protocol.RaftClientRequest; import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeerId; @@ -32,6 +33,7 @@ import org.apache.ratis.util.ProtoUtils; import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.Optional; import java.util.stream.Collectors; import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_CALLID; @@ -59,14 +61,19 @@ public interface ServerProtoUtils { return TermIndex.toString(entry.getTerm(), entry.getIndex()); } - public static String toLogEntryString(LogEntryProto entry) { + static String toLogEntryString(LogEntryProto entry) { if (entry == null) { return null; } - final ByteString clientId = entry.getClientId(); - return toTermIndexString(entry) + entry.getLogEntryBodyCase() - + ", " + (clientId.isEmpty()? "<empty clientId>": ClientId.valueOf(clientId)) - + ", cid=" + entry.getCallId(); + final String s; + if (entry.hasStateMachineLogEntry()) { + final StateMachineLogEntryProto smLog = entry.getStateMachineLogEntry(); + final ByteString clientId = smLog.getClientId(); + s = ", " + (clientId.isEmpty()? "<empty clientId>": ClientId.valueOf(clientId)) + ", cid=" + smLog.getCallId(); + } else { + s = ""; + } + return toTermIndexString(entry) + ", " + entry.getLogEntryBodyCase() + s; } public static String toString(LogEntryProto... entries) { @@ -88,12 +95,10 @@ public interface ServerProtoUtils { + reply.getReplyId().toStringUtf8() + "," + reply.getSuccess(); } - public static RaftConfigurationProto toRaftConfigurationProto( - RaftConfiguration conf) { + static RaftConfigurationProto.Builder toRaftConfigurationProto(RaftConfiguration conf) { return RaftConfigurationProto.newBuilder() .addAllPeers(ProtoUtils.toRaftPeerProtos(conf.getPeersInConf())) - .addAllOldPeers(ProtoUtils.toRaftPeerProtos(conf.getPeersInOldConf())) - .build(); + .addAllOldPeers(ProtoUtils.toRaftPeerProtos(conf.getPeersInOldConf())); } static RaftConfiguration toRaftConfiguration(LogEntryProto entry) { @@ -116,33 +121,57 @@ public interface ServerProtoUtils { .build(); } - static LogEntryProto toLogEntryProto(StateMachineLogEntryProto smLogEntry, long term, long index, - ClientId clientId, long callId) { + static LogEntryProto toLogEntryProto(StateMachineLogEntryProto smLog, long term, long index) { return LogEntryProto.newBuilder() .setTerm(term) .setIndex(index) - .setStateMachineLogEntry(smLogEntry) - .setClientId(clientId.toByteString()) - .setCallId(callId) + .setStateMachineLogEntry(smLog) .build(); } + static StateMachineEntryProto.Builder toStateMachineEntryProtoBuilder(ByteString stateMachineData) { + return StateMachineEntryProto.newBuilder().setStateMachineData(stateMachineData); + } + + static StateMachineEntryProto.Builder toStateMachineEntryProtoBuilder(int logEntryProtoSerializedSize) { + return StateMachineEntryProto.newBuilder().setLogEntryProtoSerializedSize(logEntryProtoSerializedSize); + } + static StateMachineLogEntryProto toStateMachineLogEntryProto( - ByteString logData, ByteString stateMachineData) { + RaftClientRequest request, ByteString logData, ByteString stateMachineData) { + if (logData == null) { + logData = request.getMessage().getContent(); + } + return toStateMachineLogEntryProto(request.getClientId(), request.getCallId(), logData, stateMachineData); + } + + static StateMachineLogEntryProto toStateMachineLogEntryProto( + ClientId clientId, long callId, ByteString logData, ByteString stateMachineData) { final StateMachineLogEntryProto.Builder b = StateMachineLogEntryProto.newBuilder() + .setClientId(clientId.toByteString()) + .setCallId(callId) .setLogData(logData); if (stateMachineData != null) { - b.setStateMachineData(stateMachineData); + b.setStateMachineEntry(toStateMachineEntryProtoBuilder(stateMachineData)); } return b.build(); } + static Optional<StateMachineEntryProto> getStateMachineEntry(LogEntryProto entry) { + return Optional.of(entry) + .filter(LogEntryProto::hasStateMachineLogEntry) + .map(LogEntryProto::getStateMachineLogEntry) + .filter(StateMachineLogEntryProto::hasStateMachineEntry) + .map(StateMachineLogEntryProto::getStateMachineEntry); + } + + static Optional<ByteString> getStateMachineData(LogEntryProto entry) { + return getStateMachineEntry(entry) + .map(StateMachineEntryProto::getStateMachineData); + } + static boolean shouldReadStateMachineData(LogEntryProto entry) { - if (!entry.hasStateMachineLogEntry()) { - return false; - } - final StateMachineLogEntryProto smLog = entry.getStateMachineLogEntry(); - return smLog.getStateMachineDataAttached() && smLog.getStateMachineData().isEmpty(); + return getStateMachineData(entry).map(ByteString::isEmpty).orElse(false); } /** @@ -153,21 +182,16 @@ public interface ServerProtoUtils { * otherwise, return the given entry. */ static LogEntryProto removeStateMachineData(LogEntryProto entry) { - if (!entry.hasStateMachineLogEntry()) { - return entry; - } - final StateMachineLogEntryProto smLog = entry.getStateMachineLogEntry(); - if (smLog.getStateMachineData().isEmpty()) { - return entry; - } - // build a new LogEntryProto without state machine data - // and mark that it has been removed - return LogEntryProto.newBuilder(entry) - .setStateMachineLogEntry(StateMachineLogEntryProto.newBuilder() - .setLogData(smLog.getLogData()) - .setStateMachineDataAttached(true) - .setSerializedProtobufSize(entry.getSerializedSize())) - .build(); + return getStateMachineData(entry) + .filter(stateMachineData -> !stateMachineData.isEmpty()) + .map(_dummy -> rebuildLogEntryProto(entry, toStateMachineEntryProtoBuilder(entry.getSerializedSize()))) + .orElse(entry); + } + + static LogEntryProto rebuildLogEntryProto(LogEntryProto entry, StateMachineEntryProto.Builder smEntry) { + return LogEntryProto.newBuilder(entry).setStateMachineLogEntry( + StateMachineLogEntryProto.newBuilder(entry.getStateMachineLogEntry()).setStateMachineEntry(smEntry) + ).build(); } /** @@ -177,22 +201,16 @@ public interface ServerProtoUtils { * @return LogEntryProto with stateMachineData added */ static LogEntryProto addStateMachineData(ByteString stateMachineData, LogEntryProto entry) { - final StateMachineLogEntryProto smLogEntryProto = StateMachineLogEntryProto.newBuilder(entry.getStateMachineLogEntry()) - .setStateMachineData(stateMachineData) - .build(); - return LogEntryProto.newBuilder(entry).setStateMachineLogEntry(smLogEntryProto).build(); + Preconditions.assertTrue(shouldReadStateMachineData(entry), + () -> "Failed to addStateMachineData to " + entry + " since shouldReadStateMachineData is false."); + return rebuildLogEntryProto(entry, toStateMachineEntryProtoBuilder(stateMachineData)); } - static long getSerializedSize(LogEntryProto entry) { - if (!entry.hasStateMachineLogEntry()) { - return entry.getSerializedSize(); - } - final StateMachineLogEntryProto smLog = entry.getStateMachineLogEntry(); - if (!smLog.getStateMachineDataAttached()) { - // if state machine data was never set, return the proto serialized size - return entry.getSerializedSize(); - } - return smLog.getSerializedProtobufSize(); + static int getSerializedSize(LogEntryProto entry) { + return getStateMachineEntry(entry) + .filter(smEnty -> smEnty.getStateMachineData().isEmpty()) + .map(StateMachineEntryProto::getLogEntryProtoSerializedSize) + .orElseGet(entry::getSerializedSize); } static RaftRpcReplyProto.Builder toRaftRpcReplyProtoBuilder( http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/60d0bc16/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java index 63737a3..f371c5f 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java @@ -274,9 +274,8 @@ public class ServerState implements Closeable { return log; } - long applyLog(TransactionContext operation, ClientId clientId, long callId) - throws StateMachineException { - return log.append(currentTerm, operation, clientId, callId); + long applyLog(TransactionContext operation) throws StateMachineException { + return log.append(currentTerm, operation); } /** http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/60d0bc16/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java index 261f3b0..effc027 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java @@ -18,7 +18,6 @@ package org.apache.ratis.server.storage; import org.apache.ratis.proto.RaftProtos.LogEntryProto; -import org.apache.ratis.protocol.ClientId; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.protocol.StateMachineException; import org.apache.ratis.server.impl.LogAppender; @@ -137,8 +136,7 @@ public abstract class RaftLog implements Closeable { * Used by the leader. * @return the index of the new log entry. */ - public long append(long term, TransactionContext operation, - ClientId clientId, long callId) throws StateMachineException { + public long append(long term, TransactionContext operation) throws StateMachineException { checkLogState(); try(AutoCloseableLock writeLock = writeLock()) { final long nextIndex = getNextIndex(); @@ -152,8 +150,7 @@ public abstract class RaftLog implements Closeable { } // build the log entry after calling the StateMachine - final LogEntryProto e = ServerProtoUtils.toLogEntryProto( - operation.getStateMachineLogEntry(), term, nextIndex, clientId, callId); + final LogEntryProto e = ServerProtoUtils.toLogEntryProto(operation.getStateMachineLogEntry(), term, nextIndex); int entrySize = e.getSerializedSize(); if (entrySize > maxBufferSize) { @@ -261,7 +258,7 @@ public abstract class RaftLog implements Closeable { * If an existing entry conflicts with a new one (same index but different * terms), delete the existing entry and all entries that follow it (§5.3). * - * This method, {@link #append(long, TransactionContext, ClientId, long)}, + * This method, {@link #append(long, TransactionContext)}, * {@link #append(long, RaftConfiguration)}, and {@link #truncate(long)}, * do not guarantee the changes are persisted. * Need to wait for the returned futures to persist the changes. @@ -358,7 +355,7 @@ public abstract class RaftLog implements Closeable { this.future = future; } - public long getSerializedSize() { + public int getSerializedSize() { return ServerProtoUtils.getSerializedSize(logEntry); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/60d0bc16/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/TransactionContextImpl.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/TransactionContextImpl.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/TransactionContextImpl.java index fd18d46..5e4936a 100644 --- a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/TransactionContextImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/TransactionContextImpl.java @@ -90,7 +90,7 @@ public class TransactionContextImpl implements TransactionContext { this(RaftPeerRole.LEADER, stateMachine); this.clientRequest = clientRequest; this.smLogEntryProto = smLogEntryProto != null? smLogEntryProto - : ServerProtoUtils.toStateMachineLogEntryProto(clientRequest.getMessage().getContent(), null); + : ServerProtoUtils.toStateMachineLogEntryProto(clientRequest, null, null); this.stateMachineContext = stateMachineContext; } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/60d0bc16/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java index fe25083..60629f9 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java @@ -20,6 +20,7 @@ package org.apache.ratis; import org.apache.ratis.client.RaftClient; import org.apache.ratis.proto.RaftProtos.LogEntryProto; import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto; +import org.apache.ratis.protocol.ClientId; import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.protocol.RaftPeerId; @@ -45,6 +46,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.Objects; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.BooleanSupplier; import java.util.function.IntSupplier; import java.util.function.Predicate; @@ -268,12 +270,20 @@ public interface RaftTestUtil { } class SimpleOperation { + private static final ClientId clientId = ClientId.randomId(); + private static final AtomicLong callId = new AtomicLong(); + private final String op; private final StateMachineLogEntryProto smLogEntryProto; public SimpleOperation(String op) { + this(clientId, callId.incrementAndGet(), op); + } + + private SimpleOperation(ClientId clientId, long callId, String op) { this.op = Objects.requireNonNull(op); - this.smLogEntryProto = ServerProtoUtils.toStateMachineLogEntryProto(ProtoUtils.toByteString(op), null); + this.smLogEntryProto = ServerProtoUtils.toStateMachineLogEntryProto( + clientId, callId, ProtoUtils.toByteString(op), null); } @Override http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/60d0bc16/ratis-server/src/test/java/org/apache/ratis/server/impl/RetryCacheTestUtil.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RetryCacheTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RetryCacheTestUtil.java index 039a2d8..0212eab 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RetryCacheTestUtil.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RetryCacheTestUtil.java @@ -17,8 +17,9 @@ */ package org.apache.ratis.server.impl; +import org.apache.ratis.proto.RaftProtos.LogEntryProto; +import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto; import org.apache.ratis.protocol.ClientId; -import org.apache.ratis.proto.RaftProtos; import org.apache.ratis.util.TimeDuration; import org.junit.Assert; @@ -29,19 +30,20 @@ public class RetryCacheTestUtil { return new RetryCache(5000, TimeDuration.valueOf(60, TimeUnit.SECONDS)); } - public static void createEntry(RetryCache cache, RaftProtos.LogEntryProto logEntry){ + public static void createEntry(RetryCache cache, LogEntryProto logEntry){ if(logEntry.hasStateMachineLogEntry()) { - ClientId clientId = ClientId.valueOf(logEntry.getClientId()); - long callId = logEntry.getCallId(); + final StateMachineLogEntryProto smLogEntry = logEntry.getStateMachineLogEntry(); + final ClientId clientId = ClientId.valueOf(smLogEntry.getClientId()); + final long callId = smLogEntry.getCallId(); cache.getOrCreateEntry(clientId, callId); } } - public static void assertFailure(RetryCache cache, - RaftProtos.LogEntryProto logEntry, boolean isFailed) { + public static void assertFailure(RetryCache cache, LogEntryProto logEntry, boolean isFailed) { if(logEntry.hasStateMachineLogEntry()) { - ClientId clientId = ClientId.valueOf(logEntry.getClientId()); - long callId = logEntry.getCallId(); + final StateMachineLogEntryProto smLogEntry = logEntry.getStateMachineLogEntry(); + final ClientId clientId = ClientId.valueOf(smLogEntry.getClientId()); + final long callId = smLogEntry.getCallId(); Assert.assertEquals(isFailed, cache.get(clientId, callId).isFailed()); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/60d0bc16/ratis-server/src/test/java/org/apache/ratis/server/storage/TestCacheEviction.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestCacheEviction.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestCacheEviction.java index 129c65c..1cd41a5 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestCacheEviction.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestCacheEviction.java @@ -21,7 +21,6 @@ import org.apache.ratis.BaseTest; import org.apache.ratis.MiniRaftCluster; import org.apache.ratis.RaftTestUtil.SimpleOperation; import org.apache.ratis.conf.RaftProperties; -import org.apache.ratis.protocol.ClientId; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.impl.RaftServerConstants; @@ -46,8 +45,6 @@ import java.util.concurrent.CompletableFuture; public class TestCacheEviction extends BaseTest { private static final CacheInvalidationPolicy policy = new CacheInvalidationPolicyDefault(); - private static final ClientId clientId = ClientId.randomId(); - private static final long callId = 0; private List<LogSegment> prepareSegments(int numSegments, boolean[] cached, long start, long size) { Assert.assertEquals(numSegments, cached.length); @@ -193,8 +190,7 @@ public class TestCacheEviction extends BaseTest { for (SegmentRange range : slist) { for (long index = range.start; index <= range.end; index++) { SimpleOperation m = new SimpleOperation(new String(new byte[1024])); - eList.add(ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), - range.term, index, clientId, callId)); + eList.add(ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), range.term, index)); } } return eList.toArray(new LogEntryProto[eList.size()]); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/60d0bc16/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java index f9bdd6a..d3c216d 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java @@ -22,7 +22,6 @@ import java.util.Iterator; import org.apache.ratis.RaftTestUtil.SimpleOperation; import org.apache.ratis.conf.RaftProperties; -import org.apache.ratis.protocol.ClientId; import org.apache.ratis.server.impl.ServerProtoUtils; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.storage.RaftLogCache.TruncationSegments; @@ -32,8 +31,6 @@ import org.junit.Before; import org.junit.Test; public class TestRaftLogCache { - private static final ClientId clientId = ClientId.randomId(); - private static final long callId = 0; private static final RaftProperties prop = new RaftProperties(); private RaftLogCache cache; @@ -47,8 +44,7 @@ public class TestRaftLogCache { LogSegment s = LogSegment.newOpenSegment(null, start); for (long i = start; i <= end; i++) { SimpleOperation m = new SimpleOperation("m" + i); - LogEntryProto entry = ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), - 0, i, clientId, callId); + LogEntryProto entry = ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i); s.appendToOpenSegment(entry); } if (!isOpen) { @@ -137,8 +133,7 @@ public class TestRaftLogCache { final SimpleOperation m = new SimpleOperation("m"); try { - LogEntryProto entry = ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), - 0, 0, clientId, callId); + LogEntryProto entry = ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, 0); cache.appendEntry(entry); Assert.fail("the open segment is null"); } catch (IllegalStateException ignored) { @@ -147,8 +142,7 @@ public class TestRaftLogCache { LogSegment openSegment = prepareLogSegment(100, 100, true); cache.addSegment(openSegment); for (long index = 101; index < 200; index++) { - LogEntryProto entry = ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), - 0, index, clientId, callId); + LogEntryProto entry = ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, index); cache.appendEntry(entry); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/60d0bc16/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogReadWrite.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogReadWrite.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogReadWrite.java index 49ae37c..f3b314f 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogReadWrite.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogReadWrite.java @@ -21,7 +21,6 @@ import org.apache.ratis.BaseTest; import org.apache.ratis.RaftTestUtil.SimpleOperation; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.ChecksumException; -import org.apache.ratis.protocol.ClientId; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.impl.RaftServerConstants; import org.apache.ratis.server.impl.RaftServerConstants.StartupOption; @@ -47,9 +46,6 @@ import java.util.List; * Test basic functionality of LogReader, LogInputStream, and LogOutputStream. */ public class TestRaftLogReadWrite extends BaseTest { - private static final ClientId clientId = ClientId.randomId(); - private static final long callId = 0; - private File storageDir; private long segmentMaxSize; private long preallocatedSize; @@ -93,8 +89,7 @@ public class TestRaftLogReadWrite extends BaseTest { long size = 0; for (int i = 0; i < entries.length; i++) { SimpleOperation m = new SimpleOperation("m" + i); - entries[i] = ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i, - clientId, callId); + entries[i] = ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i); final int s = entries[i].getSerializedSize(); size += CodedOutputStream.computeUInt32SizeNoTag(s) + s + 4; out.write(entries[i]); @@ -137,8 +132,7 @@ public class TestRaftLogReadWrite extends BaseTest { preallocatedSize, bufferSize)) { for (int i = 0; i < 100; i++) { SimpleOperation m = new SimpleOperation("m" + i); - entries[i] = ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i, - clientId, callId); + entries[i] = ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i); out.write(entries[i]); } } @@ -148,8 +142,7 @@ public class TestRaftLogReadWrite extends BaseTest { preallocatedSize, bufferSize)) { for (int i = 100; i < 200; i++) { SimpleOperation m = new SimpleOperation("m" + i); - entries[i] = ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i, - clientId, callId); + entries[i] = ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i); out.write(entries[i]); } } @@ -205,8 +198,7 @@ public class TestRaftLogReadWrite extends BaseTest { 16 * 1024 * 1024, 4 * 1024 * 1024, bufferSize); for (int i = 0; i < 10; i++) { SimpleOperation m = new SimpleOperation("m" + i); - entries[i] = ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i, - clientId, callId); + entries[i] = ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i); out.write(entries[i]); } out.flush(); @@ -254,8 +246,7 @@ public class TestRaftLogReadWrite extends BaseTest { preallocatedSize, bufferSize)) { for (int i = 0; i < 100; i++) { LogEntryProto entry = ServerProtoUtils.toLogEntryProto( - new SimpleOperation("m" + i).getLogEntryContent(), 0, i, - clientId, callId); + new SimpleOperation("m" + i).getLogEntryContent(), 0, i); out.write(entry); } } finally { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/60d0bc16/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java index 24f803c..a6150aa 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java @@ -20,7 +20,6 @@ package org.apache.ratis.server.storage; import org.apache.ratis.BaseTest; import org.apache.ratis.RaftTestUtil.SimpleOperation; import org.apache.ratis.conf.RaftProperties; -import org.apache.ratis.protocol.ClientId; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.impl.RaftServerConstants.StartupOption; import org.apache.ratis.server.impl.ServerProtoUtils; @@ -51,9 +50,6 @@ import static org.apache.ratis.server.storage.LogSegment.getEntrySize; * Test basic functionality of {@link LogSegment} */ public class TestRaftLogSegment extends BaseTest { - private static final ClientId clientId = ClientId.randomId(); - private static final long callId = 0; - private File storageDir; private long segmentMaxSize; private long preallocatedSize; @@ -90,8 +86,7 @@ public class TestRaftLogSegment extends BaseTest { segmentMaxSize, preallocatedSize, bufferSize)) { for (int i = 0; i < size; i++) { SimpleOperation op = new SimpleOperation("m" + i); - entries[i] = ServerProtoUtils.toLogEntryProto(op.getLogEntryContent(), - term, i + start, clientId, callId); + entries[i] = ServerProtoUtils.toLogEntryProto(op.getLogEntryContent(), term, i + start); out.write(entries[i]); } } @@ -164,8 +159,7 @@ public class TestRaftLogSegment extends BaseTest { List<LogEntryProto> list = new ArrayList<>(); while (size < max) { SimpleOperation op = new SimpleOperation("m" + i); - LogEntryProto entry = ServerProtoUtils.toLogEntryProto(op.getLogEntryContent(), - term, i++ + start, clientId, callId); + LogEntryProto entry = ServerProtoUtils.toLogEntryProto(op.getLogEntryContent(), term, i++ + start); size += getEntrySize(entry); list.add(entry); } @@ -181,18 +175,18 @@ public class TestRaftLogSegment extends BaseTest { SimpleOperation op = new SimpleOperation("m"); final StateMachineLogEntryProto m = op.getLogEntryContent(); try { - LogEntryProto entry = ServerProtoUtils.toLogEntryProto(m, 0, 1001, clientId, callId); + LogEntryProto entry = ServerProtoUtils.toLogEntryProto(m, 0, 1001); segment.appendToOpenSegment(entry); Assert.fail("should fail since the entry's index needs to be 1000"); } catch (IllegalStateException e) { // the exception is expected. } - LogEntryProto entry = ServerProtoUtils.toLogEntryProto(m, 0, 1000, clientId, callId); + LogEntryProto entry = ServerProtoUtils.toLogEntryProto(m, 0, 1000); segment.appendToOpenSegment(entry); try { - entry = ServerProtoUtils.toLogEntryProto(m, 0, 1002, clientId, callId); + entry = ServerProtoUtils.toLogEntryProto(m, 0, 1002); segment.appendToOpenSegment(entry); Assert.fail("should fail since the entry's index needs to be 1001"); } catch (IllegalStateException e) { @@ -201,7 +195,7 @@ public class TestRaftLogSegment extends BaseTest { LogEntryProto[] entries = new LogEntryProto[2]; for (int i = 0; i < 2; i++) { - entries[i] = ServerProtoUtils.toLogEntryProto(m, 0, 1001 + i * 2, clientId, callId); + entries[i] = ServerProtoUtils.toLogEntryProto(m, 0, 1001 + i * 2); } try { segment.appendToOpenSegment(entries); @@ -218,7 +212,7 @@ public class TestRaftLogSegment extends BaseTest { LogSegment segment = LogSegment.newOpenSegment(null, start); for (int i = 0; i < 100; i++) { LogEntryProto entry = ServerProtoUtils.toLogEntryProto( - new SimpleOperation("m" + i).getLogEntryContent(), term, i + start, clientId, callId); + new SimpleOperation("m" + i).getLogEntryContent(), term, i + start); segment.appendToOpenSegment(entry); } @@ -273,8 +267,7 @@ public class TestRaftLogSegment extends BaseTest { try (LogOutputStream out = new LogOutputStream(file, false, 1024, 1024, bufferSize)) { SimpleOperation op = new SimpleOperation(new String(content)); - LogEntryProto entry = ServerProtoUtils.toLogEntryProto(op.getLogEntryContent(), - 0, 0, clientId, callId); + LogEntryProto entry = ServerProtoUtils.toLogEntryProto(op.getLogEntryContent(), 0, 0); size = LogSegment.getEntrySize(entry); out.write(entry); } @@ -301,8 +294,7 @@ public class TestRaftLogSegment extends BaseTest { final byte[] content = new byte[1024]; Arrays.fill(content, (byte) 1); SimpleOperation op = new SimpleOperation(new String(content)); - LogEntryProto entry = ServerProtoUtils.toLogEntryProto(op.getLogEntryContent(), - 0, 0, clientId, callId); + LogEntryProto entry = ServerProtoUtils.toLogEntryProto(op.getLogEntryContent(), 0, 0); final long entrySize = LogSegment.getEntrySize(entry); long totalSize = SegmentedRaftLog.HEADER_BYTES.length; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/60d0bc16/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java index 919d8e7..9971d30 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java @@ -21,7 +21,6 @@ import org.apache.log4j.Level; import org.apache.ratis.BaseTest; import org.apache.ratis.RaftTestUtil.SimpleOperation; import org.apache.ratis.conf.RaftProperties; -import org.apache.ratis.protocol.ClientId; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.impl.RaftServerConstants; @@ -59,8 +58,6 @@ public class TestSegmentedRaftLog extends BaseTest { } private static final RaftPeerId peerId = RaftPeerId.valueOf("s0"); - private static final ClientId clientId = ClientId.randomId(); - private static final long callId = 0; static class SegmentRange { final long start; @@ -117,8 +114,7 @@ public class TestSegmentedRaftLog extends BaseTest { segmentMaxSize, preallocatedSize, bufferSize)) { for (int i = 0; i < size; i++) { SimpleOperation m = new SimpleOperation("m" + (i + range.start)); - entries[i] = ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), - range.term, i + range.start, clientId, callId); + entries[i] = ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), range.term, i + range.start); out.write(entries[i]); } } @@ -182,8 +178,7 @@ public class TestSegmentedRaftLog extends BaseTest { SimpleOperation m = stringSupplier == null ? new SimpleOperation("m" + index) : new SimpleOperation(stringSupplier.get()); - eList.add(ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), - range.term, index, clientId, index)); + eList.add(ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), range.term, index)); } } return eList; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/60d0bc16/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java index 325683d..9a7267b 100644 --- a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java +++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java @@ -284,7 +284,7 @@ public class SimpleStateMachine4Testing extends BaseStateMachine { public TransactionContext startTransaction(RaftClientRequest request) throws IOException { blocking.await(Blocking.Type.START_TRANSACTION); return new TransactionContextImpl(this, request, - ServerProtoUtils.toStateMachineLogEntryProto(request.getMessage().getContent(), STATE_MACHINE_DATA)); + ServerProtoUtils.toStateMachineLogEntryProto(request, null, STATE_MACHINE_DATA)); } @Override
