Repository: incubator-ratis Updated Branches: refs/heads/master 4bd32cda3 -> a82ced5dd
RATIS-357. In ProtoUtils, remove isConfigurationLogEntry(..) and move out the LogEntryProto methods. Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/a82ced5d Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/a82ced5d Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/a82ced5d Branch: refs/heads/master Commit: a82ced5ddb405fb5e12fb5c94c3d21aca65b9660 Parents: 4bd32cd Author: Tsz Wo Nicholas Sze <[email protected]> Authored: Sat Oct 20 10:32:00 2018 +0800 Committer: Tsz Wo Nicholas Sze <[email protected]> Committed: Sat Oct 20 10:32:00 2018 +0800 ---------------------------------------------------------------------- .../java/org/apache/ratis/util/ProtoUtils.java | 87 +++----------------- .../ratis/server/impl/RaftServerImpl.java | 2 +- .../ratis/server/impl/ServerProtoUtils.java | 76 ++++++++++++++++- .../apache/ratis/server/impl/ServerState.java | 20 ++--- .../apache/ratis/server/storage/LogSegment.java | 6 +- .../ratis/server/storage/MemoryRaftLog.java | 3 +- .../apache/ratis/server/storage/RaftLog.java | 9 +- .../ratis/server/storage/RaftLogWorker.java | 2 +- .../ratis/server/storage/SegmentedRaftLog.java | 3 +- .../java/org/apache/ratis/RaftTestUtil.java | 5 +- .../impl/RaftReconfigurationBaseTest.java | 9 +- .../ratis/server/storage/TestCacheEviction.java | 4 +- .../ratis/server/storage/TestRaftLogCache.java | 8 +- .../server/storage/TestRaftLogReadWrite.java | 12 +-- .../server/storage/TestRaftLogSegment.java | 20 ++--- .../server/storage/TestSegmentedRaftLog.java | 6 +- 16 files changed, 133 insertions(+), 139 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a82ced5d/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java index dff4721..e30aab1 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java @@ -17,11 +17,20 @@ */ package org.apache.ratis.util; -import org.apache.ratis.protocol.*; +import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto; +import org.apache.ratis.proto.RaftProtos.CommitInfoProto; +import org.apache.ratis.proto.RaftProtos.RaftGroupIdProto; +import org.apache.ratis.proto.RaftProtos.RaftGroupProto; +import org.apache.ratis.proto.RaftProtos.RaftPeerProto; +import org.apache.ratis.proto.RaftProtos.RaftRpcReplyProto; +import org.apache.ratis.proto.RaftProtos.RaftRpcRequestProto; +import org.apache.ratis.proto.RaftProtos.RequestVoteReplyProto; +import org.apache.ratis.protocol.RaftGroup; +import org.apache.ratis.protocol.RaftGroupId; +import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.ratis.thirdparty.com.google.protobuf.ServiceException; -import org.apache.ratis.proto.RaftProtos.*; -import org.apache.ratis.proto.RaftProtos.LogEntryProto.LogEntryBodyCase; import java.io.IOException; import java.io.ObjectInputStream; @@ -146,78 +155,6 @@ public interface ProtoUtils { return protos.stream().map(ProtoUtils::toString).collect(Collectors.toList()).toString(); } - static boolean isConfigurationLogEntry(LogEntryProto entry) { - return entry.getLogEntryBodyCase() == - LogEntryProto.LogEntryBodyCase.CONFIGURATIONENTRY; - } - - static LogEntryProto toLogEntryProto( - StateMachineLogEntryProto operation, long term, long index, - ClientId clientId, long callId) { - return LogEntryProto.newBuilder().setTerm(term).setIndex(index) - .setStateMachineLogEntry(operation) - .setClientId(clientId.toByteString()).setCallId(callId) - .build(); - } - - static boolean shouldReadStateMachineData(LogEntryProto entry) { - if (!entry.hasStateMachineLogEntry()) { - return false; - } - final StateMachineLogEntryProto smLog = entry.getStateMachineLogEntry(); - return smLog.getStateMachineDataAttached() && smLog.getStateMachineData().isEmpty(); - } - - /** - * If the given entry is {@link LogEntryBodyCase#STATEMACHINELOGENTRY} and it has state machine data, - * build a new entry without the state machine data. - * - * @return a new entry without the state machine data if the given has state machine data; - * otherwise, return the given entry. - */ - static LogEntryProto removeStateMachineData(LogEntryProto entry) { - if (!entry.hasStateMachineLogEntry()) { - return entry; - } - final StateMachineLogEntryProto smLog = entry.getStateMachineLogEntry(); - if (smLog.getStateMachineData().isEmpty()) { - return entry; - } - // build a new LogEntryProto without state machine data - // and mark that it has been removed - return LogEntryProto.newBuilder(entry) - .setStateMachineLogEntry(StateMachineLogEntryProto.newBuilder() - .setLogData(smLog.getLogData()) - .setStateMachineDataAttached(true) - .setSerializedProtobufSize(entry.getSerializedSize())) - .build(); - } - - /** - * Return a new log entry based on the input log entry with stateMachineData added. - * @param stateMachineData - state machine data to be added - * @param entry - log entry to which stateMachineData needs to be added - * @return LogEntryProto with stateMachineData added - */ - static LogEntryProto addStateMachineData(ByteString stateMachineData, LogEntryProto entry) { - final StateMachineLogEntryProto smLogEntryProto = StateMachineLogEntryProto.newBuilder(entry.getStateMachineLogEntry()) - .setStateMachineData(stateMachineData) - .build(); - return LogEntryProto.newBuilder(entry).setStateMachineLogEntry(smLogEntryProto).build(); - } - - static long getSerializedSize(LogEntryProto entry) { - if (!entry.hasStateMachineLogEntry()) { - return entry.getSerializedSize(); - } - final StateMachineLogEntryProto smLog = entry.getStateMachineLogEntry(); - if (!smLog.getStateMachineDataAttached()) { - // if state machine data was never set, return the proto serialized size - return entry.getSerializedSize(); - } - return smLog.getSerializedProtobufSize(); - } - static IOException toIOException(ServiceException se) { final Throwable t = se.getCause(); if (t == null) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a82ced5d/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 115ea22..4f767ff 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -327,7 +327,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou // start sending AppendEntries RPC to followers final LogEntryProto e = role.startLeaderState(this, getProxy().getProperties()); - getState().setRaftConf(e.getIndex(), ServerProtoUtils.toRaftConfiguration(e)); + getState().setRaftConf(e); } Collection<CommitInfoProto> getCommitInfos() { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a82ced5d/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java index 6fbce46..606cd2b 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java @@ -96,8 +96,8 @@ public interface ServerProtoUtils { .build(); } - public static RaftConfiguration toRaftConfiguration(LogEntryProto entry) { - Preconditions.assertTrue(ProtoUtils.isConfigurationLogEntry(entry)); + static RaftConfiguration toRaftConfiguration(LogEntryProto entry) { + Preconditions.assertTrue(entry.hasConfigurationEntry()); final RaftConfigurationProto proto = entry.getConfigurationEntry(); final RaftConfiguration.Builder b = RaftConfiguration.newBuilder() .setConf(ProtoUtils.toRaftPeerArray(proto.getPeersList())) @@ -108,8 +108,7 @@ public interface ServerProtoUtils { return b.build(); } - public static LogEntryProto toLogEntryProto( - RaftConfiguration conf, long term, long index) { + static LogEntryProto toLogEntryProto(RaftConfiguration conf, long term, long index) { return LogEntryProto.newBuilder() .setTerm(term) .setIndex(index) @@ -117,6 +116,17 @@ public interface ServerProtoUtils { .build(); } + static LogEntryProto toLogEntryProto(StateMachineLogEntryProto smLogEntry, long term, long index, + ClientId clientId, long callId) { + return LogEntryProto.newBuilder() + .setTerm(term) + .setIndex(index) + .setStateMachineLogEntry(smLogEntry) + .setClientId(clientId.toByteString()) + .setCallId(callId) + .build(); + } + static StateMachineLogEntryProto toStateMachineLogEntryProto( ByteString logData, ByteString stateMachineData) { final StateMachineLogEntryProto.Builder b = StateMachineLogEntryProto.newBuilder() @@ -127,6 +137,64 @@ public interface ServerProtoUtils { return b.build(); } + static boolean shouldReadStateMachineData(LogEntryProto entry) { + if (!entry.hasStateMachineLogEntry()) { + return false; + } + final StateMachineLogEntryProto smLog = entry.getStateMachineLogEntry(); + return smLog.getStateMachineDataAttached() && smLog.getStateMachineData().isEmpty(); + } + + /** + * If the given entry has state machine log entry and it has state machine data, + * build a new entry without the state machine data. + * + * @return a new entry without the state machine data if the given has state machine data; + * otherwise, return the given entry. + */ + static LogEntryProto removeStateMachineData(LogEntryProto entry) { + if (!entry.hasStateMachineLogEntry()) { + return entry; + } + final StateMachineLogEntryProto smLog = entry.getStateMachineLogEntry(); + if (smLog.getStateMachineData().isEmpty()) { + return entry; + } + // build a new LogEntryProto without state machine data + // and mark that it has been removed + return LogEntryProto.newBuilder(entry) + .setStateMachineLogEntry(StateMachineLogEntryProto.newBuilder() + .setLogData(smLog.getLogData()) + .setStateMachineDataAttached(true) + .setSerializedProtobufSize(entry.getSerializedSize())) + .build(); + } + + /** + * Return a new log entry based on the input log entry with stateMachineData added. + * @param stateMachineData - state machine data to be added + * @param entry - log entry to which stateMachineData needs to be added + * @return LogEntryProto with stateMachineData added + */ + static LogEntryProto addStateMachineData(ByteString stateMachineData, LogEntryProto entry) { + final StateMachineLogEntryProto smLogEntryProto = StateMachineLogEntryProto.newBuilder(entry.getStateMachineLogEntry()) + .setStateMachineData(stateMachineData) + .build(); + return LogEntryProto.newBuilder(entry).setStateMachineLogEntry(smLogEntryProto).build(); + } + + static long getSerializedSize(LogEntryProto entry) { + if (!entry.hasStateMachineLogEntry()) { + return entry.getSerializedSize(); + } + final StateMachineLogEntryProto smLog = entry.getStateMachineLogEntry(); + if (!smLog.getStateMachineDataAttached()) { + // if state machine data was never set, return the proto serialized size + return entry.getSerializedSize(); + } + return smLog.getSerializedProtobufSize(); + } + static RaftRpcReplyProto.Builder toRaftRpcReplyProtoBuilder( RaftPeerId requestorId, RaftPeerId replyId, RaftGroupId groupId, boolean success) { return ClientProtoUtils.toRaftRpcReplyProtoBuilder( http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a82ced5d/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java index 6d0477e..63737a3 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java @@ -27,7 +27,6 @@ import org.apache.ratis.proto.RaftProtos.LogEntryProto; import org.apache.ratis.statemachine.SnapshotInfo; import org.apache.ratis.statemachine.StateMachine; import org.apache.ratis.statemachine.TransactionContext; -import org.apache.ratis.util.ProtoUtils; import org.apache.ratis.util.Timestamp; import java.io.Closeable; @@ -45,7 +44,6 @@ import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import static org.apache.ratis.server.impl.RaftServerImpl.LOG; -import static org.apache.ratis.proto.RaftProtos.LogEntryProto.LogEntryBodyCase.CONFIGURATIONENTRY; /** * Common states of a raft peer. Protected by RaftServer's lock. @@ -113,11 +111,7 @@ public class ServerState implements Closeable { // we cannot apply log entries to the state machine in this step, since we // do not know whether the local log entries have been committed. - log = initLog(id, prop, lastApplied, entry -> { - if (entry.getLogEntryBodyCase() == CONFIGURATIONENTRY) { - setRaftConf(entry.getIndex(), ServerProtoUtils.toRaftConfiguration(entry)); - } - }); + log = initLog(id, prop, lastApplied, this::setRaftConf); RaftLog.Metadata metadata = log.loadMetadata(); currentTerm = metadata.getTerm(); @@ -344,6 +338,12 @@ public class ServerState implements Closeable { getRaftConf().getLogEntryIndex(); } + void setRaftConf(LogEntryProto entry) { + if (entry.hasConfigurationEntry()) { + setRaftConf(entry.getIndex(), ServerProtoUtils.toRaftConfiguration(entry)); + } + } + void setRaftConf(long logIndex, RaftConfiguration conf) { configurationManager.addConfiguration(logIndex, conf); server.getServerRpc().addPeers(conf.getPeers()); @@ -354,11 +354,7 @@ public class ServerState implements Closeable { void updateConfiguration(LogEntryProto[] entries) { if (entries != null && entries.length > 0) { configurationManager.removeConfigurations(entries[0].getIndex()); - for (LogEntryProto entry : entries) { - if (ProtoUtils.isConfigurationLogEntry(entry)) { - setRaftConf(entry.getIndex(), ServerProtoUtils.toRaftConfiguration(entry)); - } - } + Arrays.stream(entries).forEach(this::setRaftConf); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a82ced5d/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java index 968a1ce..5f61864 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java @@ -18,7 +18,6 @@ package org.apache.ratis.server.storage; import org.apache.ratis.proto.RaftProtos.LogEntryProto; -import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto; import org.apache.ratis.server.impl.ServerProtoUtils; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting; @@ -26,7 +25,6 @@ import org.apache.ratis.thirdparty.com.google.common.cache.CacheLoader; import org.apache.ratis.thirdparty.com.google.protobuf.CodedOutputStream; import org.apache.ratis.util.FileUtils; import org.apache.ratis.util.Preconditions; -import org.apache.ratis.util.ProtoUtils; import java.io.File; import java.io.IOException; @@ -48,7 +46,7 @@ import java.util.function.Consumer; */ class LogSegment implements Comparable<Long> { static long getEntrySize(LogEntryProto entry) { - final int serialized = ProtoUtils.removeStateMachineData(entry).getSerializedSize(); + final int serialized = ServerProtoUtils.removeStateMachineData(entry).getSerializedSize(); return serialized + CodedOutputStream.computeUInt32SizeNoTag(serialized) + 4; } @@ -270,7 +268,7 @@ class LogSegment implements Comparable<Long> { if (keepEntryInCache) { entryCache.put(record.getTermIndex(), entry); } - if (ProtoUtils.isConfigurationLogEntry(entry)) { + if (entry.hasConfigurationEntry()) { configEntries.add(record.getTermIndex()); } totalSize += getEntrySize(entry); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a82ced5d/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java index 07c73ec..5a09aef 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java @@ -25,7 +25,6 @@ import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.proto.RaftProtos.LogEntryProto; import org.apache.ratis.util.AutoCloseableLock; import org.apache.ratis.util.Preconditions; -import org.apache.ratis.util.ProtoUtils; import java.util.ArrayList; import java.util.Collections; @@ -207,6 +206,6 @@ public class MemoryRaftLog extends RaftLog { @Override public boolean isConfigEntry(TermIndex ti) { - return ProtoUtils.isConfigurationLogEntry(get(ti.getIndex())); + return get(ti.getIndex()).hasConfigurationEntry(); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a82ced5d/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java index c666624..261f3b0 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java @@ -31,7 +31,6 @@ import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.ratis.util.AutoCloseableLock; import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.Preconditions; -import org.apache.ratis.util.ProtoUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -153,7 +152,7 @@ public abstract class RaftLog implements Closeable { } // build the log entry after calling the StateMachine - final LogEntryProto e = ProtoUtils.toLogEntryProto( + final LogEntryProto e = ServerProtoUtils.toLogEntryProto( operation.getStateMachineLogEntry(), term, nextIndex, clientId, callId); int entrySize = e.getSerializedSize(); @@ -360,7 +359,7 @@ public abstract class RaftLog implements Closeable { } public long getSerializedSize() { - return ProtoUtils.getSerializedSize(logEntry); + return ServerProtoUtils.getSerializedSize(logEntry); } public LogEntryProto getEntry() throws RaftLogIOException { @@ -370,7 +369,7 @@ public abstract class RaftLog implements Closeable { } try { - entryProto = future.thenApply(data -> ProtoUtils.addStateMachineData(data, logEntry)).join(); + entryProto = future.thenApply(data -> ServerProtoUtils.addStateMachineData(data, logEntry)).join(); } catch (Throwable t) { final String err = selfId + ": Failed readStateMachineData for " + ServerProtoUtils.toLogEntryString(logEntry); @@ -379,7 +378,7 @@ public abstract class RaftLog implements Closeable { } // by this time we have already read the state machine data, // so the log entry data should be set now - if (ProtoUtils.shouldReadStateMachineData(entryProto)) { + if (ServerProtoUtils.shouldReadStateMachineData(entryProto)) { final String err = selfId + ": State machine data not set for " + ServerProtoUtils.toLogEntryString(logEntry); LogAppender.LOG.error(err); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a82ced5d/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java index 30df276..b5b9ac5 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java @@ -276,7 +276,7 @@ class RaftLogWorker implements Runnable { private final CompletableFuture<Long> combined; WriteLog(LogEntryProto entry) { - this.entry = ProtoUtils.removeStateMachineData(entry); + this.entry = ServerProtoUtils.removeStateMachineData(entry); if (this.entry == entry || stateMachine == null) { this.stateMachineFuture = null; } else { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a82ced5d/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java index 862e21f..c16c478 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java @@ -31,7 +31,6 @@ import org.apache.ratis.proto.RaftProtos.LogEntryProto; import org.apache.ratis.util.AutoCloseableLock; import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.Preconditions; -import org.apache.ratis.util.ProtoUtils; import java.io.File; import java.io.IOException; @@ -195,7 +194,7 @@ public class SegmentedRaftLog extends RaftLog { @Override public EntryWithData getEntryWithData(long index) throws RaftLogIOException { final LogEntryProto entry = get(index); - if (!ProtoUtils.shouldReadStateMachineData(entry)) { + if (!ServerProtoUtils.shouldReadStateMachineData(entry)) { return new EntryWithData(entry, null); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a82ced5d/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java index c789610..fe25083 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java @@ -187,9 +187,8 @@ public interface RaftTestUtil { if (e.hasStateMachineLogEntry()) { LOG.info(ServerProtoUtils.toString(e) + ", " + e.getStateMachineLogEntry().toString().trim().replace("\n", ", ")); entries.add(e); - } else if (e.getLogEntryBodyCase() == LogEntryProto.LogEntryBodyCase.CONFIGURATIONENTRY) { - LOG.info("Found " + LogEntryProto.LogEntryBodyCase.CONFIGURATIONENTRY + " at " + ti - + ", ignoring it."); + } else if (e.hasConfigurationEntry()) { + LOG.info("Found ConfigurationEntry at {}, ignoring it.", ti); } else { throw new AssertionError("Unexpected LogEntryBodyCase " + e.getLogEntryBodyCase() + " at " + ti + ": " + ServerProtoUtils.toString(e)); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a82ced5d/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java index 16536e7..e9651d6 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java @@ -47,7 +47,6 @@ import java.util.concurrent.atomic.AtomicReference; import static java.util.Arrays.asList; import static org.apache.ratis.server.impl.RaftServerTestUtil.waitAndCheckNewConf; -import static org.apache.ratis.proto.RaftProtos.LogEntryProto.LogEntryBodyCase.CONFIGURATIONENTRY; public abstract class RaftReconfigurationBaseTest extends BaseTest { static { @@ -544,15 +543,15 @@ public abstract class RaftReconfigurationBaseTest extends BaseTest { }); clientThread.start(); - // find CONFIGURATIONENTRY, there may be NOOP before and after it. + // find ConfigurationEntry final long confIndex = JavaUtils.attempt(() -> { final long last = log.getLastEntryTermIndex().getIndex(); for (long i = last; i >= 1; i--) { - if (log.get(i).getLogEntryBodyCase() == CONFIGURATIONENTRY) { + if (log.get(i).hasConfigurationEntry()) { return i; } } - throw new Exception("CONFIGURATIONENTRY not found: last=" + last); + throw new Exception("ConfigurationEntry not found: last=" + last); }, 10, 500, "confIndex", LOG); // wait till the old leader persist the new conf @@ -572,7 +571,7 @@ public abstract class RaftReconfigurationBaseTest extends BaseTest { // the old leader should have truncated the setConf from the log JavaUtils.attempt(() -> log.getLastCommittedIndex() >= confIndex, 10, 500L, "COMMIT", LOG); - Assert.assertEquals(CONFIGURATIONENTRY, log.get(confIndex).getLogEntryBodyCase()); + Assert.assertTrue(log.get(confIndex).hasConfigurationEntry()); log2 = null; } finally { RaftStorageTestUtils.printLog(log2, s -> LOG.info(s)); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a82ced5d/ratis-server/src/test/java/org/apache/ratis/server/storage/TestCacheEviction.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestCacheEviction.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestCacheEviction.java index a883727..129c65c 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestCacheEviction.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestCacheEviction.java @@ -26,13 +26,13 @@ import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.impl.RaftServerConstants; import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.server.impl.ServerProtoUtils; import org.apache.ratis.server.impl.ServerState; import org.apache.ratis.server.storage.CacheInvalidationPolicy.CacheInvalidationPolicyDefault; import org.apache.ratis.server.storage.TestSegmentedRaftLog.SegmentRange; import org.apache.ratis.proto.RaftProtos.LogEntryProto; import org.apache.ratis.statemachine.SimpleStateMachine4Testing; import org.apache.ratis.statemachine.StateMachine; -import org.apache.ratis.util.ProtoUtils; import org.apache.ratis.util.SizeInBytes; import org.junit.Assert; import org.junit.Test; @@ -193,7 +193,7 @@ public class TestCacheEviction extends BaseTest { for (SegmentRange range : slist) { for (long index = range.start; index <= range.end; index++) { SimpleOperation m = new SimpleOperation(new String(new byte[1024])); - eList.add(ProtoUtils.toLogEntryProto(m.getLogEntryContent(), + eList.add(ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), range.term, index, clientId, callId)); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a82ced5d/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java index eeb7ea0..f9bdd6a 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java @@ -23,10 +23,10 @@ import java.util.Iterator; import org.apache.ratis.RaftTestUtil.SimpleOperation; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.ClientId; +import org.apache.ratis.server.impl.ServerProtoUtils; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.storage.RaftLogCache.TruncationSegments; import org.apache.ratis.proto.RaftProtos.LogEntryProto; -import org.apache.ratis.util.ProtoUtils; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -47,7 +47,7 @@ public class TestRaftLogCache { LogSegment s = LogSegment.newOpenSegment(null, start); for (long i = start; i <= end; i++) { SimpleOperation m = new SimpleOperation("m" + i); - LogEntryProto entry = ProtoUtils.toLogEntryProto(m.getLogEntryContent(), + LogEntryProto entry = ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i, clientId, callId); s.appendToOpenSegment(entry); } @@ -137,7 +137,7 @@ public class TestRaftLogCache { final SimpleOperation m = new SimpleOperation("m"); try { - LogEntryProto entry = ProtoUtils.toLogEntryProto(m.getLogEntryContent(), + LogEntryProto entry = ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, 0, clientId, callId); cache.appendEntry(entry); Assert.fail("the open segment is null"); @@ -147,7 +147,7 @@ public class TestRaftLogCache { LogSegment openSegment = prepareLogSegment(100, 100, true); cache.addSegment(openSegment); for (long index = 101; index < 200; index++) { - LogEntryProto entry = ProtoUtils.toLogEntryProto(m.getLogEntryContent(), + LogEntryProto entry = ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, index, clientId, callId); cache.appendEntry(entry); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a82ced5d/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogReadWrite.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogReadWrite.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogReadWrite.java index 6ffb1bf..49ae37c 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogReadWrite.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogReadWrite.java @@ -25,10 +25,10 @@ import org.apache.ratis.protocol.ClientId; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.impl.RaftServerConstants; import org.apache.ratis.server.impl.RaftServerConstants.StartupOption; +import org.apache.ratis.server.impl.ServerProtoUtils; import org.apache.ratis.thirdparty.com.google.protobuf.CodedOutputStream; import org.apache.ratis.proto.RaftProtos.LogEntryProto; import org.apache.ratis.util.FileUtils; -import org.apache.ratis.util.ProtoUtils; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -93,7 +93,7 @@ public class TestRaftLogReadWrite extends BaseTest { long size = 0; for (int i = 0; i < entries.length; i++) { SimpleOperation m = new SimpleOperation("m" + i); - entries[i] = ProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i, + entries[i] = ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i, clientId, callId); final int s = entries[i].getSerializedSize(); size += CodedOutputStream.computeUInt32SizeNoTag(s) + s + 4; @@ -137,7 +137,7 @@ public class TestRaftLogReadWrite extends BaseTest { preallocatedSize, bufferSize)) { for (int i = 0; i < 100; i++) { SimpleOperation m = new SimpleOperation("m" + i); - entries[i] = ProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i, + entries[i] = ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i, clientId, callId); out.write(entries[i]); } @@ -148,7 +148,7 @@ public class TestRaftLogReadWrite extends BaseTest { preallocatedSize, bufferSize)) { for (int i = 100; i < 200; i++) { SimpleOperation m = new SimpleOperation("m" + i); - entries[i] = ProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i, + entries[i] = ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i, clientId, callId); out.write(entries[i]); } @@ -205,7 +205,7 @@ public class TestRaftLogReadWrite extends BaseTest { 16 * 1024 * 1024, 4 * 1024 * 1024, bufferSize); for (int i = 0; i < 10; i++) { SimpleOperation m = new SimpleOperation("m" + i); - entries[i] = ProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i, + entries[i] = ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i, clientId, callId); out.write(entries[i]); } @@ -253,7 +253,7 @@ public class TestRaftLogReadWrite extends BaseTest { new LogOutputStream(openSegment, false, segmentMaxSize, preallocatedSize, bufferSize)) { for (int i = 0; i < 100; i++) { - LogEntryProto entry = ProtoUtils.toLogEntryProto( + LogEntryProto entry = ServerProtoUtils.toLogEntryProto( new SimpleOperation("m" + i).getLogEntryContent(), 0, i, clientId, callId); out.write(entry); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a82ced5d/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java index 0f2a669..24f803c 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java @@ -23,11 +23,11 @@ import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.ClientId; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.impl.RaftServerConstants.StartupOption; +import org.apache.ratis.server.impl.ServerProtoUtils; import org.apache.ratis.server.storage.LogSegment.LogRecordWithEntry; import org.apache.ratis.proto.RaftProtos.LogEntryProto; import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto; import org.apache.ratis.util.FileUtils; -import org.apache.ratis.util.ProtoUtils; import org.apache.ratis.util.SizeInBytes; import org.apache.ratis.util.TraditionalBinaryPrefix; import org.junit.After; @@ -90,7 +90,7 @@ public class TestRaftLogSegment extends BaseTest { segmentMaxSize, preallocatedSize, bufferSize)) { for (int i = 0; i < size; i++) { SimpleOperation op = new SimpleOperation("m" + i); - entries[i] = ProtoUtils.toLogEntryProto(op.getLogEntryContent(), + entries[i] = ServerProtoUtils.toLogEntryProto(op.getLogEntryContent(), term, i + start, clientId, callId); out.write(entries[i]); } @@ -164,7 +164,7 @@ public class TestRaftLogSegment extends BaseTest { List<LogEntryProto> list = new ArrayList<>(); while (size < max) { SimpleOperation op = new SimpleOperation("m" + i); - LogEntryProto entry = ProtoUtils.toLogEntryProto(op.getLogEntryContent(), + LogEntryProto entry = ServerProtoUtils.toLogEntryProto(op.getLogEntryContent(), term, i++ + start, clientId, callId); size += getEntrySize(entry); list.add(entry); @@ -181,18 +181,18 @@ public class TestRaftLogSegment extends BaseTest { SimpleOperation op = new SimpleOperation("m"); final StateMachineLogEntryProto m = op.getLogEntryContent(); try { - LogEntryProto entry = ProtoUtils.toLogEntryProto(m, 0, 1001, clientId, callId); + LogEntryProto entry = ServerProtoUtils.toLogEntryProto(m, 0, 1001, clientId, callId); segment.appendToOpenSegment(entry); Assert.fail("should fail since the entry's index needs to be 1000"); } catch (IllegalStateException e) { // the exception is expected. } - LogEntryProto entry = ProtoUtils.toLogEntryProto(m, 0, 1000, clientId, callId); + LogEntryProto entry = ServerProtoUtils.toLogEntryProto(m, 0, 1000, clientId, callId); segment.appendToOpenSegment(entry); try { - entry = ProtoUtils.toLogEntryProto(m, 0, 1002, clientId, callId); + entry = ServerProtoUtils.toLogEntryProto(m, 0, 1002, clientId, callId); segment.appendToOpenSegment(entry); Assert.fail("should fail since the entry's index needs to be 1001"); } catch (IllegalStateException e) { @@ -201,7 +201,7 @@ public class TestRaftLogSegment extends BaseTest { LogEntryProto[] entries = new LogEntryProto[2]; for (int i = 0; i < 2; i++) { - entries[i] = ProtoUtils.toLogEntryProto(m, 0, 1001 + i * 2, clientId, callId); + entries[i] = ServerProtoUtils.toLogEntryProto(m, 0, 1001 + i * 2, clientId, callId); } try { segment.appendToOpenSegment(entries); @@ -217,7 +217,7 @@ public class TestRaftLogSegment extends BaseTest { final long start = 1000; LogSegment segment = LogSegment.newOpenSegment(null, start); for (int i = 0; i < 100; i++) { - LogEntryProto entry = ProtoUtils.toLogEntryProto( + LogEntryProto entry = ServerProtoUtils.toLogEntryProto( new SimpleOperation("m" + i).getLogEntryContent(), term, i + start, clientId, callId); segment.appendToOpenSegment(entry); } @@ -273,7 +273,7 @@ public class TestRaftLogSegment extends BaseTest { try (LogOutputStream out = new LogOutputStream(file, false, 1024, 1024, bufferSize)) { SimpleOperation op = new SimpleOperation(new String(content)); - LogEntryProto entry = ProtoUtils.toLogEntryProto(op.getLogEntryContent(), + LogEntryProto entry = ServerProtoUtils.toLogEntryProto(op.getLogEntryContent(), 0, 0, clientId, callId); size = LogSegment.getEntrySize(entry); out.write(entry); @@ -301,7 +301,7 @@ public class TestRaftLogSegment extends BaseTest { final byte[] content = new byte[1024]; Arrays.fill(content, (byte) 1); SimpleOperation op = new SimpleOperation(new String(content)); - LogEntryProto entry = ProtoUtils.toLogEntryProto(op.getLogEntryContent(), + LogEntryProto entry = ServerProtoUtils.toLogEntryProto(op.getLogEntryContent(), 0, 0, clientId, callId); final long entrySize = LogSegment.getEntrySize(entry); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a82ced5d/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java index 0de9e8a..919d8e7 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java @@ -28,11 +28,11 @@ import org.apache.ratis.server.impl.RaftServerConstants; import org.apache.ratis.server.impl.RetryCacheTestUtil; import org.apache.ratis.server.impl.RetryCache; import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.server.impl.ServerProtoUtils; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.proto.RaftProtos.LogEntryProto; import org.apache.ratis.util.FileUtils; import org.apache.ratis.util.LogUtils; -import org.apache.ratis.util.ProtoUtils; import org.apache.ratis.util.SizeInBytes; import org.junit.After; import org.junit.Assert; @@ -117,7 +117,7 @@ public class TestSegmentedRaftLog extends BaseTest { segmentMaxSize, preallocatedSize, bufferSize)) { for (int i = 0; i < size; i++) { SimpleOperation m = new SimpleOperation("m" + (i + range.start)); - entries[i] = ProtoUtils.toLogEntryProto(m.getLogEntryContent(), + entries[i] = ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), range.term, i + range.start, clientId, callId); out.write(entries[i]); } @@ -182,7 +182,7 @@ public class TestSegmentedRaftLog extends BaseTest { SimpleOperation m = stringSupplier == null ? new SimpleOperation("m" + index) : new SimpleOperation(stringSupplier.get()); - eList.add(ProtoUtils.toLogEntryProto(m.getLogEntryContent(), + eList.add(ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), range.term, index, clientId, index)); } }
