http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java new file mode 100644 index 0000000..d079abf --- /dev/null +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -0,0 +1,832 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.impl; + +import static org.apache.ratis.util.LifeCycle.State.CLOSED; +import static org.apache.ratis.util.LifeCycle.State.CLOSING; +import static org.apache.ratis.util.LifeCycle.State.RUNNING; +import static org.apache.ratis.util.LifeCycle.State.STARTING; + +import static org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.*; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.OptionalLong; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.protocol.Message; +import org.apache.ratis.protocol.NotLeaderException; +import org.apache.ratis.protocol.RaftClientReply; +import org.apache.ratis.protocol.RaftClientRequest; +import org.apache.ratis.protocol.RaftException; +import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.protocol.ReconfigurationInProgressException; +import org.apache.ratis.protocol.SetConfigurationRequest; +import org.apache.ratis.server.RaftServer; +import org.apache.ratis.server.RaftServerConfigKeys; +import org.apache.ratis.server.RaftServerRpc; +import org.apache.ratis.server.protocol.TermIndex; +import org.apache.ratis.server.storage.FileInfo; +import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto; +import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesRequestProto; +import org.apache.ratis.shaded.proto.RaftProtos.FileChunkProto; +import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotReplyProto; +import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotRequestProto; +import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotResult; +import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; +import org.apache.ratis.shaded.proto.RaftProtos.RequestVoteReplyProto; +import org.apache.ratis.shaded.proto.RaftProtos.RequestVoteRequestProto; +import org.apache.ratis.statemachine.SnapshotInfo; +import org.apache.ratis.statemachine.StateMachine; +import org.apache.ratis.statemachine.TransactionContext; +import org.apache.ratis.util.CodeInjectionForTesting; +import org.apache.ratis.util.LifeCycle; +import org.apache.ratis.util.ProtoUtils; +import org.apache.ratis.util.RaftUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + +public class RaftServerImpl implements RaftServer { + public static final Logger LOG = LoggerFactory.getLogger(RaftServerImpl.class); + + private static final String CLASS_NAME = RaftServerImpl.class.getSimpleName(); + static final String REQUEST_VOTE = CLASS_NAME + ".requestVote"; + static final String APPEND_ENTRIES = CLASS_NAME + ".appendEntries"; + static final String INSTALL_SNAPSHOT = CLASS_NAME + ".installSnapshot"; + + + /** Role of raft peer */ + enum Role { + LEADER, CANDIDATE, FOLLOWER + } + + private final int minTimeoutMs; + private final int maxTimeoutMs; + + private final LifeCycle lifeCycle; + private final ServerState state; + private final StateMachine stateMachine; + private final RaftProperties properties; + private volatile Role role; + + /** used when the peer is follower, to monitor election timeout */ + private volatile FollowerState heartbeatMonitor; + + /** used when the peer is candidate, to request votes from other peers */ + private volatile LeaderElection electionDaemon; + + /** used when the peer is leader */ + private volatile LeaderState leaderState; + + private RaftServerRpc serverRpc; + + private final LogAppenderFactory appenderFactory; + + public RaftServerImpl(String id, RaftConfiguration raftConf, + RaftProperties properties, StateMachine stateMachine) throws IOException { + this.lifeCycle = new LifeCycle(id); + minTimeoutMs = properties.getInt( + RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MIN_MS_KEY, + RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MIN_MS_DEFAULT); + maxTimeoutMs = properties.getInt( + RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MAX_MS_KEY, + RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MAX_MS_DEFAULT); + Preconditions.checkArgument(maxTimeoutMs > minTimeoutMs, + "max timeout: %s, min timeout: %s", maxTimeoutMs, minTimeoutMs); + this.properties = properties; + this.stateMachine = stateMachine; + this.state = new ServerState(id, raftConf, properties, this, stateMachine); + appenderFactory = initAppenderFactory(); + } + + int getMinTimeoutMs() { + return minTimeoutMs; + } + + int getMaxTimeoutMs() { + return maxTimeoutMs; + } + + int getRandomTimeoutMs() { + return RaftUtils.getRandomBetween(minTimeoutMs, maxTimeoutMs); + } + + @Override + public StateMachine getStateMachine() { + return this.stateMachine; + } + + public LogAppenderFactory getLogAppenderFactory() { + return appenderFactory; + } + + private LogAppenderFactory initAppenderFactory() { + Class<? extends LogAppenderFactory> factoryClass = properties.getClass( + RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY, + RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_DEFAULT, + LogAppenderFactory.class); + return RaftUtils.newInstance(factoryClass); + } + + /** + * Used by tests to set initial raft configuration with correct port bindings. + */ + @VisibleForTesting + public void setInitialConf(RaftConfiguration conf) { + this.state.setInitialConf(conf); + } + + @Override + public void setServerRpc(RaftServerRpc serverRpc) { + this.serverRpc = serverRpc; + // add peers into rpc service + RaftConfiguration conf = getRaftConf(); + if (conf != null) { + serverRpc.addPeers(conf.getPeers()); + } + } + + public RaftServerRpc getServerRpc() { + return serverRpc; + } + + @Override + public void start() { + lifeCycle.transition(STARTING); + state.start(); + RaftConfiguration conf = getRaftConf(); + if (conf != null && conf.contains(getId())) { + LOG.debug("{} starts as a follower", getId()); + startAsFollower(); + } else { + LOG.debug("{} starts with initializing state", getId()); + startInitializing(); + } + } + + /** + * The peer belongs to the current configuration, should start as a follower + */ + private void startAsFollower() { + role = Role.FOLLOWER; + heartbeatMonitor = new FollowerState(this); + heartbeatMonitor.start(); + + serverRpc.start(); + lifeCycle.transition(RUNNING); + } + + /** + * The peer does not have any configuration (maybe it will later be included + * in some configuration). Start still as a follower but will not vote or + * start election. + */ + private void startInitializing() { + role = Role.FOLLOWER; + // do not start heartbeatMonitoring + serverRpc.start(); + } + + public ServerState getState() { + return this.state; + } + + @Override + public String getId() { + return getState().getSelfId(); + } + + RaftConfiguration getRaftConf() { + return getState().getRaftConf(); + } + + @Override + public void close() { + lifeCycle.checkStateAndClose(() -> { + try { + shutdownHeartbeatMonitor(); + shutdownElectionDaemon(); + shutdownLeaderState(); + + serverRpc.close(); + state.close(); + } catch (Exception ignored) { + LOG.warn("Failed to kill " + state.getSelfId(), ignored); + } + }); + } + + @VisibleForTesting + public boolean isAlive() { + return !lifeCycle.getCurrentState().isOneOf(CLOSING, CLOSED); + } + + public boolean isFollower() { + return role == Role.FOLLOWER; + } + + public boolean isCandidate() { + return role == Role.CANDIDATE; + } + + public boolean isLeader() { + return role == Role.LEADER; + } + + /** + * Change the server state to Follower if necessary + * @param newTerm The new term. + * @param sync We will call {@link ServerState#persistMetadata()} if this is + * set to true and term/votedFor get updated. + * @return if the term/votedFor should be updated to the new term + * @throws IOException if term/votedFor persistence failed. + */ + synchronized boolean changeToFollower(long newTerm, boolean sync) + throws IOException { + final Role old = role; + role = Role.FOLLOWER; + + boolean metadataUpdated = false; + if (newTerm > state.getCurrentTerm()) { + state.setCurrentTerm(newTerm); + state.resetLeaderAndVotedFor(); + metadataUpdated = true; + } + + if (old == Role.LEADER) { + assert leaderState != null; + shutdownLeaderState(); + } else if (old == Role.CANDIDATE) { + shutdownElectionDaemon(); + } + + if (old != Role.FOLLOWER) { + heartbeatMonitor = new FollowerState(this); + heartbeatMonitor.start(); + } + + if (metadataUpdated && sync) { + state.persistMetadata(); + } + return metadataUpdated; + } + + private synchronized void shutdownLeaderState() { + final LeaderState leader = leaderState; + if (leader != null) { + leader.stop(); + } + leaderState = null; + // TODO: make sure that StateMachineUpdater has applied all transactions that have context + } + + private void shutdownElectionDaemon() { + final LeaderElection election = electionDaemon; + if (election != null) { + election.stopRunning(); + // no need to interrupt the election thread + } + electionDaemon = null; + } + + synchronized void changeToLeader() { + Preconditions.checkState(isCandidate()); + shutdownElectionDaemon(); + role = Role.LEADER; + state.becomeLeader(); + // start sending AppendEntries RPC to followers + leaderState = new LeaderState(this, properties); + leaderState.start(); + } + + private void shutdownHeartbeatMonitor() { + final FollowerState hm = heartbeatMonitor; + if (hm != null) { + hm.stopRunning(); + hm.interrupt(); + } + heartbeatMonitor = null; + } + + synchronized void changeToCandidate() { + Preconditions.checkState(isFollower()); + shutdownHeartbeatMonitor(); + role = Role.CANDIDATE; + // start election + electionDaemon = new LeaderElection(this); + electionDaemon.start(); + } + + @Override + public String toString() { + return role + " " + state + " " + lifeCycle.getCurrentState(); + } + + /** + * @return null if the server is in leader state. + */ + private CompletableFuture<RaftClientReply> checkLeaderState( + RaftClientRequest request) { + if (!isLeader()) { + NotLeaderException exception = generateNotLeaderException(); + CompletableFuture<RaftClientReply> future = new CompletableFuture<>(); + future.complete(new RaftClientReply(request, exception)); + return future; + } + return null; + } + + NotLeaderException generateNotLeaderException() { + if (lifeCycle.getCurrentState() != RUNNING) { + return new NotLeaderException(getId(), null, null); + } + String leaderId = state.getLeaderId(); + if (leaderId == null || leaderId.equals(state.getSelfId())) { + // No idea about who is the current leader. Or the peer is the current + // leader, but it is about to step down + RaftPeer suggestedLeader = state.getRaftConf() + .getRandomPeer(state.getSelfId()); + leaderId = suggestedLeader == null ? null : suggestedLeader.getId(); + } + RaftConfiguration conf = getRaftConf(); + Collection<RaftPeer> peers = conf.getPeers(); + return new NotLeaderException(getId(), conf.getPeer(leaderId), + peers.toArray(new RaftPeer[peers.size()])); + } + + /** + * Handle a normal update request from client. + */ + private CompletableFuture<RaftClientReply> appendTransaction( + RaftClientRequest request, TransactionContext entry) + throws RaftException { + LOG.debug("{}: receive client request({})", getId(), request); + lifeCycle.assertCurrentState(RUNNING); + CompletableFuture<RaftClientReply> reply; + + final PendingRequest pending; + synchronized (this) { + reply = checkLeaderState(request); + if (reply != null) { + return reply; + } + + // append the message to its local log + final long entryIndex; + try { + entryIndex = state.applyLog(entry); + } catch (IOException e) { + throw new RaftException(e); + } + + // put the request into the pending queue + pending = leaderState.addPendingRequest(entryIndex, request, entry); + leaderState.notifySenders(); + } + return pending.getFuture(); + } + + @Override + public CompletableFuture<RaftClientReply> submitClientRequestAsync( + RaftClientRequest request) throws IOException { + // first check the server's leader state + CompletableFuture<RaftClientReply> reply = checkLeaderState(request); + if (reply != null) { + return reply; + } + + // let the state machine handle read-only request from client + if (request.isReadOnly()) { + // TODO: We might not be the leader anymore by the time this completes. See the RAFT paper, + // section 8 (last part) + return stateMachine.query(request); + } + + // TODO: this client request will not be added to pending requests + // until later which means that any failure in between will leave partial state in the + // state machine. We should call cancelTransaction() for failed requests + TransactionContext entry = stateMachine.startTransaction(request); + if (entry.getException().isPresent()) { + throw RaftUtils.asIOException(entry.getException().get()); + } + + return appendTransaction(request, entry); + } + + @Override + public RaftClientReply submitClientRequest(RaftClientRequest request) + throws IOException { + return waitForReply(getId(), request, submitClientRequestAsync(request)); + } + + private static RaftClientReply waitForReply(String id, RaftClientRequest request, + CompletableFuture<RaftClientReply> future) throws IOException { + try { + return future.get(); + } catch (InterruptedException e) { + final String s = id + ": Interrupted when waiting for reply, request=" + request; + LOG.info(s, e); + throw RaftUtils.toInterruptedIOException(s, e); + } catch (ExecutionException e) { + final Throwable cause = e.getCause(); + if (cause == null) { + throw new IOException(e); + } + if (cause instanceof NotLeaderException) { + return new RaftClientReply(request, (NotLeaderException)cause); + } else { + throw RaftUtils.asIOException(cause); + } + } + } + + @Override + public RaftClientReply setConfiguration(SetConfigurationRequest request) + throws IOException { + return waitForReply(getId(), request, setConfigurationAsync(request)); + } + + /** + * Handle a raft configuration change request from client. + */ + @Override + public CompletableFuture<RaftClientReply> setConfigurationAsync( + SetConfigurationRequest request) throws IOException { + LOG.debug("{}: receive setConfiguration({})", getId(), request); + lifeCycle.assertCurrentState(RUNNING); + CompletableFuture<RaftClientReply> reply = checkLeaderState(request); + if (reply != null) { + return reply; + } + + final RaftPeer[] peersInNewConf = request.getPeersInNewConf(); + final PendingRequest pending; + synchronized (this) { + reply = checkLeaderState(request); + if (reply != null) { + return reply; + } + + final RaftConfiguration current = getRaftConf(); + // make sure there is no other raft reconfiguration in progress + if (!current.isStable() || leaderState.inStagingState() || + !state.isCurrentConfCommitted()) { + throw new ReconfigurationInProgressException( + "Reconfiguration is already in progress: " + current); + } + + // return true if the new configuration is the same with the current one + if (current.hasNoChange(peersInNewConf)) { + pending = leaderState.returnNoConfChange(request); + return pending.getFuture(); + } + + // add new peers into the rpc service + getServerRpc().addPeers(Arrays.asList(peersInNewConf)); + // add staging state into the leaderState + pending = leaderState.startSetConfiguration(request); + } + return pending.getFuture(); + } + + private boolean shouldWithholdVotes() { + return isLeader() || (isFollower() && state.hasLeader() + && heartbeatMonitor.shouldWithholdVotes()); + } + + /** + * check if the remote peer is not included in the current conf + * and should shutdown. should shutdown if all the following stands: + * 1. this is a leader + * 2. current conf is stable and has been committed + * 3. candidate id is not included in conf + * 4. candidate's last entry's index < conf's index + */ + private boolean shouldSendShutdown(String candidateId, + TermIndex candidateLastEntry) { + return isLeader() + && getRaftConf().isStable() + && getState().isConfCommitted() + && !getRaftConf().containsInConf(candidateId) + && candidateLastEntry.getIndex() < getRaftConf().getLogEntryIndex() + && !leaderState.isBootStrappingPeer(candidateId); + } + + @Override + public RequestVoteReplyProto requestVote(RequestVoteRequestProto r) + throws IOException { + final String candidateId = r.getServerRequest().getRequestorId(); + return requestVote(candidateId, r.getCandidateTerm(), + ServerProtoUtils.toTermIndex(r.getCandidateLastEntry())); + } + + private RequestVoteReplyProto requestVote(String candidateId, + long candidateTerm, TermIndex candidateLastEntry) throws IOException { + CodeInjectionForTesting.execute(REQUEST_VOTE, getId(), + candidateId, candidateTerm, candidateLastEntry); + LOG.debug("{}: receive requestVote({}, {}, {})", + getId(), candidateId, candidateTerm, candidateLastEntry); + lifeCycle.assertCurrentState(RUNNING); + + boolean voteGranted = false; + boolean shouldShutdown = false; + final RequestVoteReplyProto reply; + synchronized (this) { + if (shouldWithholdVotes()) { + LOG.info("{} Withhold vote from server {} with term {}. " + + "This server:{}, last rpc time from leader {} is {}", getId(), + candidateId, candidateTerm, this, this.getState().getLeaderId(), + (isFollower() ? heartbeatMonitor.getLastRpcTime() : -1)); + } else if (state.recognizeCandidate(candidateId, candidateTerm)) { + boolean termUpdated = changeToFollower(candidateTerm, false); + // see Section 5.4.1 Election restriction + if (state.isLogUpToDate(candidateLastEntry)) { + heartbeatMonitor.updateLastRpcTime(false); + state.grantVote(candidateId); + voteGranted = true; + } + if (termUpdated || voteGranted) { + state.persistMetadata(); // sync metafile + } + } + if (!voteGranted && shouldSendShutdown(candidateId, candidateLastEntry)) { + shouldShutdown = true; + } + reply = ServerProtoUtils.toRequestVoteReplyProto(candidateId, getId(), + voteGranted, state.getCurrentTerm(), shouldShutdown); + if (LOG.isDebugEnabled()) { + LOG.debug("{} replies to vote request: {}. Peer's state: {}", + getId(), ProtoUtils.toString(reply), state); + } + } + return reply; + } + + private void validateEntries(long expectedTerm, TermIndex previous, + LogEntryProto... entries) { + if (entries != null && entries.length > 0) { + final long index0 = entries[0].getIndex(); + + if (previous == null || previous.getTerm() == 0) { + Preconditions.checkArgument(index0 == 0, + "Unexpected Index: previous is null but entries[%s].getIndex()=%s", + 0, index0); + } else { + Preconditions.checkArgument(previous.getIndex() == index0 - 1, + "Unexpected Index: previous is %s but entries[%s].getIndex()=%s", + previous, 0, index0); + } + + for (int i = 0; i < entries.length; i++) { + final long t = entries[i].getTerm(); + Preconditions.checkArgument(expectedTerm >= t, + "Unexpected Term: entries[%s].getTerm()=%s but expectedTerm=%s", + i, t, expectedTerm); + + final long indexi = entries[i].getIndex(); + Preconditions.checkArgument(indexi == index0 + i, + "Unexpected Index: entries[%s].getIndex()=%s but entries[0].getIndex()=%s", + i, indexi, index0); + } + } + } + + @Override + public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto r) + throws IOException { + // TODO avoid converting list to array + final LogEntryProto[] entries = r.getEntriesList() + .toArray(new LogEntryProto[r.getEntriesCount()]); + final TermIndex previous = r.hasPreviousLog() ? + ServerProtoUtils.toTermIndex(r.getPreviousLog()) : null; + return appendEntries(r.getServerRequest().getRequestorId(), + r.getLeaderTerm(), previous, r.getLeaderCommit(), r.getInitializing(), + entries); + } + + private AppendEntriesReplyProto appendEntries(String leaderId, long leaderTerm, + TermIndex previous, long leaderCommit, boolean initializing, + LogEntryProto... entries) throws IOException { + CodeInjectionForTesting.execute(APPEND_ENTRIES, getId(), + leaderId, leaderTerm, previous, leaderCommit, initializing, entries); + if (LOG.isDebugEnabled()) { + LOG.debug("{}: receive appendEntries({}, {}, {}, {}, {}, {})", getId(), + leaderId, leaderTerm, previous, leaderCommit, initializing, + ServerProtoUtils.toString(entries)); + } + lifeCycle.assertCurrentState(STARTING, RUNNING); + + try { + validateEntries(leaderTerm, previous, entries); + } catch (IllegalArgumentException e) { + throw new IOException(e); + } + + final long currentTerm; + long nextIndex = state.getLog().getNextIndex(); + synchronized (this) { + final boolean recognized = state.recognizeLeader(leaderId, leaderTerm); + currentTerm = state.getCurrentTerm(); + if (!recognized) { + final AppendEntriesReplyProto reply = ServerProtoUtils.toAppendEntriesReplyProto( + leaderId, getId(), currentTerm, nextIndex, NOT_LEADER); + if (LOG.isDebugEnabled()) { + LOG.debug("{}: do not recognize leader. Reply: {}", + getId(), ProtoUtils.toString(reply)); + } + return reply; + } + changeToFollower(leaderTerm, true); + state.setLeader(leaderId); + + if (!initializing && lifeCycle.compareAndTransition(STARTING, RUNNING)) { + heartbeatMonitor = new FollowerState(this); + heartbeatMonitor.start(); + } + if (lifeCycle.getCurrentState() == RUNNING) { + heartbeatMonitor.updateLastRpcTime(true); + } + + // We need to check if "previous" is in the local peer. Note that it is + // possible that "previous" is covered by the latest snapshot: e.g., + // it's possible there's no log entries outside of the latest snapshot. + // However, it is not possible that "previous" index is smaller than the + // last index included in snapshot. This is because indices <= snapshot's + // last index should have been committed. + if (previous != null && !containPrevious(previous)) { + final AppendEntriesReplyProto reply = + ServerProtoUtils.toAppendEntriesReplyProto(leaderId, getId(), + currentTerm, Math.min(nextIndex, previous.getIndex()), INCONSISTENCY); + LOG.debug("{}: inconsistency entries. Leader previous:{}, Reply:{}", + getId(), previous, ServerProtoUtils.toString(reply)); + return reply; + } + + state.getLog().append(entries); + state.updateConfiguration(entries); + state.updateStatemachine(leaderCommit, currentTerm); + } + if (entries != null && entries.length > 0) { + try { + state.getLog().logSync(); + } catch (InterruptedException e) { + throw new InterruptedIOException("logSync got interrupted"); + } + nextIndex = entries[entries.length - 1].getIndex() + 1; + } + synchronized (this) { + if (lifeCycle.getCurrentState() == RUNNING && isFollower() + && getState().getCurrentTerm() == currentTerm) { + // reset election timer to avoid punishing the leader for our own + // long disk writes + heartbeatMonitor.updateLastRpcTime(false); + } + } + final AppendEntriesReplyProto reply = ServerProtoUtils.toAppendEntriesReplyProto( + leaderId, getId(), currentTerm, nextIndex, SUCCESS); + LOG.debug("{}: succeeded to handle AppendEntries. Reply: {}", getId(), + ServerProtoUtils.toString(reply)); + return reply; + } + + private boolean containPrevious(TermIndex previous) { + LOG.debug("{}: prev:{}, latestSnapshot:{}, getLatestInstalledSnapshot:{}", + getId(), previous, state.getLatestSnapshot(), state.getLatestInstalledSnapshot()); + return state.getLog().contains(previous) + || (state.getLatestSnapshot() != null + && state.getLatestSnapshot().getTermIndex().equals(previous)) + || (state.getLatestInstalledSnapshot() != null) + && state.getLatestInstalledSnapshot().equals(previous); + } + + @Override + public InstallSnapshotReplyProto installSnapshot( + InstallSnapshotRequestProto request) throws IOException { + final String leaderId = request.getServerRequest().getRequestorId(); + CodeInjectionForTesting.execute(INSTALL_SNAPSHOT, getId(), leaderId, request); + LOG.debug("{}: receive installSnapshot({})", getId(), request); + + lifeCycle.assertCurrentState(STARTING, RUNNING); + + final long currentTerm; + final long leaderTerm = request.getLeaderTerm(); + final TermIndex lastTermIndex = ServerProtoUtils.toTermIndex( + request.getTermIndex()); + final long lastIncludedIndex = lastTermIndex.getIndex(); + synchronized (this) { + final boolean recognized = state.recognizeLeader(leaderId, leaderTerm); + currentTerm = state.getCurrentTerm(); + if (!recognized) { + final InstallSnapshotReplyProto reply = ServerProtoUtils + .toInstallSnapshotReplyProto(leaderId, getId(), currentTerm, + request.getRequestIndex(), InstallSnapshotResult.NOT_LEADER); + LOG.debug("{}: do not recognize leader for installing snapshot." + + " Reply: {}", getId(), reply); + return reply; + } + changeToFollower(leaderTerm, true); + state.setLeader(leaderId); + + if (lifeCycle.getCurrentState() == RUNNING) { + heartbeatMonitor.updateLastRpcTime(true); + } + + // Check and append the snapshot chunk. We simply put this in lock + // considering a follower peer requiring a snapshot installation does not + // have a lot of requests + Preconditions.checkState( + state.getLog().getNextIndex() <= lastIncludedIndex, + "%s log's next id is %s, last included index in snapshot is %s", + getId(), state.getLog().getNextIndex(), lastIncludedIndex); + + //TODO: We should only update State with installed snapshot once the request is done. + state.installSnapshot(request); + + // update the committed index + // re-load the state machine if this is the last chunk + if (request.getDone()) { + state.reloadStateMachine(lastIncludedIndex, leaderTerm); + } + if (lifeCycle.getCurrentState() == RUNNING) { + heartbeatMonitor.updateLastRpcTime(false); + } + } + if (request.getDone()) { + LOG.info("{}: successfully install the whole snapshot-{}", getId(), + lastIncludedIndex); + } + return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getId(), + currentTerm, request.getRequestIndex(), InstallSnapshotResult.SUCCESS); + } + + AppendEntriesRequestProto createAppendEntriesRequest(long leaderTerm, + String targetId, TermIndex previous, List<LogEntryProto> entries, + boolean initializing) { + return ServerProtoUtils.toAppendEntriesRequestProto(getId(), targetId, + leaderTerm, entries, state.getLog().getLastCommittedIndex(), + initializing, previous); + } + + synchronized InstallSnapshotRequestProto createInstallSnapshotRequest( + String targetId, String requestId, int requestIndex, SnapshotInfo snapshot, + List<FileChunkProto> chunks, boolean done) { + OptionalLong totalSize = snapshot.getFiles().stream() + .mapToLong(FileInfo::getFileSize).reduce(Long::sum); + assert totalSize.isPresent(); + return ServerProtoUtils.toInstallSnapshotRequestProto(getId(), targetId, + requestId, requestIndex, state.getCurrentTerm(), snapshot.getTermIndex(), + chunks, totalSize.getAsLong(), done); + } + + synchronized RequestVoteRequestProto createRequestVoteRequest(String targetId, + long term, TermIndex lastEntry) { + return ServerProtoUtils.toRequestVoteRequestProto(getId(), targetId, term, + lastEntry); + } + + public synchronized void submitLocalSyncEvent() { + if (isLeader() && leaderState != null) { + leaderState.submitUpdateStateEvent(LeaderState.UPDATE_COMMIT_EVENT); + } + } + + synchronized void replyPendingRequest(long logIndex, + CompletableFuture<Message> message) { + if (isLeader() && leaderState != null) { // is leader and is running + leaderState.replyPendingRequest(logIndex, message); + } + } + + TransactionContext getTransactionContext(long index) { + if (leaderState != null) { // is leader and is running + return leaderState.getTransactionContext(index); + } + return null; + } + + public RaftProperties getProperties() { + return this.properties; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java new file mode 100644 index 0000000..4d4371d --- /dev/null +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java @@ -0,0 +1,191 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.impl; + +import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_SEQNUM; +import static org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.SUCCESS; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +import org.apache.ratis.client.impl.ClientProtoUtils; +import org.apache.ratis.server.protocol.TermIndex; +import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto; +import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesRequestProto; +import org.apache.ratis.shaded.proto.RaftProtos.FileChunkProto; +import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotReplyProto; +import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotRequestProto; +import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotResult; +import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; +import org.apache.ratis.shaded.proto.RaftProtos.RaftConfigurationProto; +import org.apache.ratis.shaded.proto.RaftProtos.RaftRpcReplyProto; +import org.apache.ratis.shaded.proto.RaftProtos.RequestVoteReplyProto; +import org.apache.ratis.shaded.proto.RaftProtos.RequestVoteRequestProto; +import org.apache.ratis.shaded.proto.RaftProtos.TermIndexProto; +import org.apache.ratis.util.ProtoUtils; + + +/** Server proto utilities for internal use. */ +public class ServerProtoUtils { + public static TermIndex toTermIndex(TermIndexProto p) { + return p == null? null: TermIndex.newTermIndex(p.getTerm(), p.getIndex()); + } + + public static TermIndexProto toTermIndexProto(TermIndex ti) { + return ti == null? null: TermIndexProto.newBuilder() + .setTerm(ti.getTerm()) + .setIndex(ti.getIndex()) + .build(); + } + + public static TermIndex toTermIndex(LogEntryProto entry) { + return entry == null ? null : + TermIndex.newTermIndex(entry.getTerm(), entry.getIndex()); + } + + public static String toString(LogEntryProto... entries) { + return entries == null? "null" + : entries.length == 0 ? "[]" + : entries.length == 1? "" + toTermIndex(entries[0]) + : "" + Arrays.stream(entries).map(ServerProtoUtils::toTermIndex) + .collect(Collectors.toList()); + } + + public static String toString(AppendEntriesReplyProto reply) { + return toString(reply.getServerReply()) + "," + reply.getResult() + + ",nextIndex:" + reply.getNextIndex() + ",term:" + reply.getTerm(); + } + + private static String toString(RaftRpcReplyProto reply) { + return reply.getRequestorId() + "->" + reply.getReplyId() + "," + + reply.getSuccess(); + } + + public static RaftConfigurationProto toRaftConfigurationProto( + RaftConfiguration conf) { + return RaftConfigurationProto.newBuilder() + .addAllPeers(ProtoUtils.toRaftPeerProtos(conf.getPeersInConf())) + .addAllOldPeers(ProtoUtils.toRaftPeerProtos(conf.getPeersInOldConf())) + .build(); + } + + public static RaftConfiguration toRaftConfiguration( + long index, RaftConfigurationProto proto) { + final RaftConfiguration.Builder b = RaftConfiguration.newBuilder() + .setConf(ProtoUtils.toRaftPeerArray(proto.getPeersList())) + .setLogEntryIndex(index); + if (proto.getOldPeersCount() > 0) { + b.setOldConf(ProtoUtils.toRaftPeerArray(proto.getOldPeersList())); + } + return b.build(); + } + + public static LogEntryProto toLogEntryProto( + RaftConfiguration conf, long term, long index) { + return LogEntryProto.newBuilder() + .setTerm(term) + .setIndex(index) + .setConfigurationEntry(toRaftConfigurationProto(conf)) + .build(); + } + + public static RequestVoteReplyProto toRequestVoteReplyProto( + String requestorId, String replyId, boolean success, long term, + boolean shouldShutdown) { + final RequestVoteReplyProto.Builder b = RequestVoteReplyProto.newBuilder(); + b.setServerReply(ClientProtoUtils.toRaftRpcReplyProtoBuilder(requestorId, replyId, + DEFAULT_SEQNUM, success)) + .setTerm(term) + .setShouldShutdown(shouldShutdown); + return b.build(); + } + + public static RequestVoteRequestProto toRequestVoteRequestProto( + String requestorId, String replyId, long term, TermIndex lastEntry) { + final RequestVoteRequestProto.Builder b = RequestVoteRequestProto.newBuilder() + .setServerRequest( + ClientProtoUtils.toRaftRpcRequestProtoBuilder(requestorId, replyId, DEFAULT_SEQNUM)) + .setCandidateTerm(term); + if (lastEntry != null) { + b.setCandidateLastEntry(toTermIndexProto(lastEntry)); + } + return b.build(); + } + + public static InstallSnapshotReplyProto toInstallSnapshotReplyProto( + String requestorId, String replyId, long term, int requestIndex, + InstallSnapshotResult result) { + final RaftRpcReplyProto.Builder rb = ClientProtoUtils.toRaftRpcReplyProtoBuilder(requestorId, + replyId, DEFAULT_SEQNUM, result == InstallSnapshotResult.SUCCESS); + final InstallSnapshotReplyProto.Builder builder = InstallSnapshotReplyProto + .newBuilder().setServerReply(rb).setTerm(term).setResult(result) + .setRequestIndex(requestIndex); + return builder.build(); + } + + public static InstallSnapshotRequestProto toInstallSnapshotRequestProto( + String requestorId, String replyId, String requestId, int requestIndex, + long term, TermIndex lastTermIndex, List<FileChunkProto> chunks, + long totalSize, boolean done) { + return InstallSnapshotRequestProto.newBuilder() + .setServerRequest( + ClientProtoUtils.toRaftRpcRequestProtoBuilder(requestorId, replyId, DEFAULT_SEQNUM)) + .setRequestId(requestId) + .setRequestIndex(requestIndex) + // .setRaftConfiguration() TODO: save and pass RaftConfiguration + .setLeaderTerm(term) + .setTermIndex(toTermIndexProto(lastTermIndex)) + .addAllFileChunks(chunks) + .setTotalSize(totalSize) + .setDone(done).build(); + } + + public static AppendEntriesReplyProto toAppendEntriesReplyProto( + String requestorId, String replyId, long term, + long nextIndex, AppendEntriesReplyProto.AppendResult appendResult) { + RaftRpcReplyProto.Builder rb = ClientProtoUtils.toRaftRpcReplyProtoBuilder(requestorId, + replyId, DEFAULT_SEQNUM, appendResult == SUCCESS); + final AppendEntriesReplyProto.Builder b = AppendEntriesReplyProto.newBuilder(); + b.setServerReply(rb).setTerm(term).setNextIndex(nextIndex) + .setResult(appendResult); + return b.build(); + } + + public static AppendEntriesRequestProto toAppendEntriesRequestProto( + String requestorId, String replyId, long leaderTerm, + List<LogEntryProto> entries, long leaderCommit, boolean initializing, + TermIndex previous) { + final AppendEntriesRequestProto.Builder b = AppendEntriesRequestProto + .newBuilder() + .setServerRequest( + ClientProtoUtils.toRaftRpcRequestProtoBuilder(requestorId, replyId, DEFAULT_SEQNUM)) + .setLeaderTerm(leaderTerm) + .setLeaderCommit(leaderCommit) + .setInitializing(initializing); + if (entries != null && !entries.isEmpty()) { + b.addAllEntries(entries); + } + + if (previous != null) { + b.setPreviousLog(toTermIndexProto(previous)); + } + return b.build(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java new file mode 100644 index 0000000..8608fc4 --- /dev/null +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java @@ -0,0 +1,350 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.impl; + +import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_USE_MEMORY_LOG_DEFAULT; +import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_USE_MEMORY_LOG_KEY; + +import java.io.Closeable; +import java.io.IOException; + +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.server.protocol.TermIndex; +import org.apache.ratis.server.storage.MemoryRaftLog; +import org.apache.ratis.server.storage.RaftLog; +import org.apache.ratis.server.storage.RaftStorage; +import org.apache.ratis.server.storage.SegmentedRaftLog; +import org.apache.ratis.server.storage.SnapshotManager; +import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotRequestProto; +import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; +import org.apache.ratis.statemachine.SnapshotInfo; +import org.apache.ratis.statemachine.StateMachine; +import org.apache.ratis.statemachine.TransactionContext; +import org.apache.ratis.util.ProtoUtils; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + +/** + * Common states of a raft peer. Protected by RaftServer's lock. + */ +public class ServerState implements Closeable { + private final String selfId; + private final RaftServerImpl server; + /** Raft log */ + private final RaftLog log; + /** Raft configuration */ + private final ConfigurationManager configurationManager; + /** The thread that applies committed log entries to the state machine */ + private final StateMachineUpdater stateMachineUpdater; + /** local storage for log and snapshot */ + private final RaftStorage storage; + private final SnapshotManager snapshotManager; + + /** + * Latest term server has seen. initialized to 0 on first boot, increases + * monotonically. + */ + private long currentTerm; + /** + * The server ID of the leader for this term. Null means either there is + * no leader for this term yet or this server does not know who it is yet. + */ + private String leaderId; + /** + * Candidate that this peer granted vote for in current term (or null if none) + */ + private String votedFor; + + /** + * Latest installed snapshot for this server. This maybe different than StateMachine's latest + * snapshot. Once we successfully install a snapshot, the SM may not pick it up immediately. + * Further, this will not get updated when SM does snapshots itself. + */ + private TermIndex latestInstalledSnapshot; + + ServerState(String id, RaftConfiguration conf, RaftProperties prop, + RaftServerImpl server, StateMachine stateMachine) throws IOException { + this.selfId = id; + this.server = server; + configurationManager = new ConfigurationManager(conf); + storage = new RaftStorage(prop, RaftServerConstants.StartupOption.REGULAR); + snapshotManager = new SnapshotManager(storage, id); + + long lastApplied = initStatemachine(stateMachine, prop); + + leaderId = null; + log = initLog(id, prop, server, lastApplied); + RaftLog.Metadata metadata = log.loadMetadata(); + currentTerm = metadata.getTerm(); + votedFor = metadata.getVotedFor(); + + stateMachineUpdater = new StateMachineUpdater(stateMachine, server, log, + lastApplied, prop); + } + + /** + * Used by tests to set initial raft configuration with correct port bindings. + */ + @VisibleForTesting + public void setInitialConf(RaftConfiguration initialConf) { + configurationManager.setInitialConf(initialConf); + } + + private long initStatemachine(StateMachine sm, RaftProperties properties) + throws IOException { + sm.initialize(selfId, properties, storage); + storage.setStateMachineStorage(sm.getStateMachineStorage()); + SnapshotInfo snapshot = sm.getLatestSnapshot(); + + if (snapshot == null || snapshot.getTermIndex().getIndex() < 0) { + return RaftServerConstants.INVALID_LOG_INDEX; + } + + // get the raft configuration from the snapshot + RaftConfiguration raftConf = sm.getRaftConfiguration(); + if (raftConf != null) { + configurationManager.addConfiguration(raftConf.getLogEntryIndex(), + raftConf); + } + return snapshot.getIndex(); + } + + void start() { + stateMachineUpdater.start(); + } + + /** + * note we do not apply log entries to the state machine here since we do not + * know whether they have been committed. + */ + private RaftLog initLog(String id, RaftProperties prop, RaftServerImpl server, + long lastIndexInSnapshot) throws IOException { + final RaftLog log; + if (prop.getBoolean(RAFT_SERVER_USE_MEMORY_LOG_KEY, + RAFT_SERVER_USE_MEMORY_LOG_DEFAULT)) { + log = new MemoryRaftLog(id); + } else { + log = new SegmentedRaftLog(id, server, this.storage, + lastIndexInSnapshot, prop); + } + log.open(configurationManager, lastIndexInSnapshot); + return log; + } + + public RaftConfiguration getRaftConf() { + return configurationManager.getCurrent(); + } + + @VisibleForTesting + + public String getSelfId() { + return this.selfId; + } + + public long getCurrentTerm() { + return currentTerm; + } + + void setCurrentTerm(long term) { + currentTerm = term; + } + + String getLeaderId() { + return leaderId; + } + + boolean hasLeader() { + return leaderId != null; + } + + /** + * Become a candidate and start leader election + */ + long initElection() { + votedFor = selfId; + leaderId = null; + return ++currentTerm; + } + + void persistMetadata() throws IOException { + this.log.writeMetadata(currentTerm, votedFor); + } + + void resetLeaderAndVotedFor() { + votedFor = null; + leaderId = null; + } + + /** + * Vote for a candidate and update the local state. + */ + void grantVote(String candidateId) { + votedFor = candidateId; + leaderId = null; + } + + void setLeader(String leaderId) { + this.leaderId = leaderId; + } + + void becomeLeader() { + leaderId = selfId; + } + + public RaftLog getLog() { + return log; + } + + long applyLog(TransactionContext operation) throws IOException { + return log.append(currentTerm, operation); + } + + /** + * Check if accept the leader selfId and term from the incoming AppendEntries rpc. + * If accept, update the current state. + * @return true if the check passes + */ + boolean recognizeLeader(String leaderId, long leaderTerm) { + if (leaderTerm < currentTerm) { + return false; + } else if (leaderTerm > currentTerm || this.leaderId == null) { + // If the request indicates a term that is greater than the current term + // or no leader has been set for the current term, make sure to update + // leader and term later + return true; + } + Preconditions.checkArgument(this.leaderId.equals(leaderId), + "selfId:%s, this.leaderId:%s, received leaderId:%s", + selfId, this.leaderId, leaderId); + return true; + } + + /** + * Check if the candidate's term is acceptable + */ + boolean recognizeCandidate(String candidateId, + long candidateTerm) { + if (candidateTerm > currentTerm) { + return true; + } else if (candidateTerm == currentTerm) { + // has not voted yet or this is a retry + return votedFor == null || votedFor.equals(candidateId); + } + return false; + } + + boolean isLogUpToDate(TermIndex candidateLastEntry) { + LogEntryProto lastEntry = log.getLastEntry(); + // need to take into account snapshot + SnapshotInfo snapshot = server.getStateMachine().getLatestSnapshot(); + if (lastEntry == null && snapshot == null) { + return true; + } else if (candidateLastEntry == null) { + return false; + } + TermIndex local = ServerProtoUtils.toTermIndex(lastEntry); + if (local == null || (snapshot != null && snapshot.getIndex() > lastEntry.getIndex())) { + local = snapshot.getTermIndex(); + } + return local.compareTo(candidateLastEntry) <= 0; + } + + @Override + public String toString() { + return selfId + ":t" + currentTerm + ", leader=" + leaderId + + ", voted=" + votedFor + ", raftlog=" + log + ", conf=" + getRaftConf(); + } + + boolean isConfCommitted() { + return getLog().getLastCommittedIndex() >= + getRaftConf().getLogEntryIndex(); + } + + public void setRaftConf(long logIndex, RaftConfiguration conf) { + configurationManager.addConfiguration(logIndex, conf); + RaftServerImpl.LOG.info("{}: successfully update the configuration {}", + getSelfId(), conf); + } + + void updateConfiguration(LogEntryProto[] entries) { + if (entries != null && entries.length > 0) { + configurationManager.removeConfigurations(entries[0].getIndex()); + for (LogEntryProto entry : entries) { + if (ProtoUtils.isConfigurationLogEntry(entry)) { + final RaftConfiguration conf = ServerProtoUtils.toRaftConfiguration( + entry.getIndex(), entry.getConfigurationEntry()); + configurationManager.addConfiguration(entry.getIndex(), conf); + server.getServerRpc().addPeers(conf.getPeers()); + } + } + } + } + + void updateStatemachine(long majorityIndex, long currentTerm) { + log.updateLastCommitted(majorityIndex, currentTerm); + stateMachineUpdater.notifyUpdater(); + } + + void reloadStateMachine(long lastIndexInSnapshot, long currentTerm) + throws IOException { + log.updateLastCommitted(lastIndexInSnapshot, currentTerm); + + stateMachineUpdater.reloadStateMachine(); + } + + @Override + public void close() throws IOException { + stateMachineUpdater.stop(); + RaftServerImpl.LOG.info("{} closes. The last applied log index is {}", + getSelfId(), getLastAppliedIndex()); + storage.close(); + } + + @VisibleForTesting + public RaftStorage getStorage() { + return storage; + } + + void installSnapshot(InstallSnapshotRequestProto request) throws IOException { + // TODO: verify that we need to install the snapshot + StateMachine sm = server.getStateMachine(); + sm.pause(); // pause the SM to prepare for install snapshot + snapshotManager.installSnapshot(sm, request); + log.syncWithSnapshot(request.getTermIndex().getIndex()); + this.latestInstalledSnapshot = ServerProtoUtils.toTermIndex( + request.getTermIndex()); + } + + SnapshotInfo getLatestSnapshot() { + return server.getStateMachine().getStateMachineStorage().getLatestSnapshot(); + } + + public TermIndex getLatestInstalledSnapshot() { + return latestInstalledSnapshot; + } + + @VisibleForTesting + public long getLastAppliedIndex() { + return stateMachineUpdater.getLastAppliedIndex(); + } + + boolean isCurrentConfCommitted() { + return getRaftConf().getLogEntryIndex() <= getLog().getLastCommittedIndex(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerUtils.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerUtils.java new file mode 100644 index 0000000..3dc9ab4 --- /dev/null +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerUtils.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.impl; + +import org.apache.commons.lang.builder.HashCodeBuilder; +import org.apache.ratis.server.protocol.TermIndex; + +/** Server utilities for internal use. */ +public class ServerUtils { + public static TermIndex newTermIndex(long term, long index) { + return new TermIndexImpl(term, index); + } + + private static class TermIndexImpl implements TermIndex { + private final long term; + private final long index; //log index; first index is 1. + + TermIndexImpl(long term, long logIndex) { + this.term = term; + this.index = logIndex; + } + + @Override + public long getTerm() { + return term; + } + + @Override + public long getIndex() { + return index; + } + + @Override + public int compareTo(TermIndex that) { + final int d = Long.compare(this.getTerm(), that.getTerm()); + return d != 0 ? d : Long.compare(this.getIndex(), that.getIndex()); + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } else if (obj == null || !(obj instanceof TermIndexImpl)) { + return false; + } + + final TermIndexImpl that = (TermIndexImpl) obj; + return this.getTerm() == that.getTerm() + && this.getIndex() == that.getIndex(); + } + + @Override + public int hashCode() { + return new HashCodeBuilder().append(term).append(index).hashCode(); + } + + private static String toString(long n) { + return n < 0 ? "~" : "" + n; + } + + @Override + public String toString() { + return "(t:" + toString(term) + ", i:" + toString(index) + ")"; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java new file mode 100644 index 0000000..a2a4149 --- /dev/null +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java @@ -0,0 +1,214 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.impl; + +import static org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase.CONFIGURATIONENTRY; +import static org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase.SMLOGENTRY; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; + +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.protocol.Message; +import org.apache.ratis.server.RaftServerConfigKeys; +import org.apache.ratis.server.storage.RaftLog; +import org.apache.ratis.server.storage.RaftStorage; +import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; +import org.apache.ratis.statemachine.SnapshotInfo; +import org.apache.ratis.statemachine.StateMachine; +import org.apache.ratis.statemachine.TransactionContext; +import org.apache.ratis.util.Daemon; +import org.apache.ratis.util.ExitUtils; +import org.apache.ratis.util.LifeCycle; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +/** + * This class tracks the log entries that have been committed in a quorum and + * applies them to the state machine. We let a separate thread do this work + * asynchronously so that this will not block normal raft protocol. + * + * If the auto log compaction is enabled, the state machine updater thread will + * trigger a snapshot of the state machine by calling + * {@link StateMachine#takeSnapshot} when the log size exceeds a limit. + */ +class StateMachineUpdater implements Runnable { + static final Logger LOG = LoggerFactory.getLogger(StateMachineUpdater.class); + + enum State { + RUNNING, STOP, RELOAD + } + + private final RaftProperties properties; + private final StateMachine stateMachine; + private final RaftServerImpl server; + private final RaftLog raftLog; + + private volatile long lastAppliedIndex; + + private final boolean autoSnapshotEnabled; + private final long snapshotThreshold; + private long lastSnapshotIndex; + + private final Thread updater; + private volatile State state = State.RUNNING; + + StateMachineUpdater(StateMachine stateMachine, RaftServerImpl server, + RaftLog raftLog, long lastAppliedIndex, RaftProperties properties) { + this.properties = properties; + this.stateMachine = stateMachine; + this.server = server; + this.raftLog = raftLog; + + this.lastAppliedIndex = lastAppliedIndex; + lastSnapshotIndex = lastAppliedIndex; + + autoSnapshotEnabled = properties.getBoolean( + RaftServerConfigKeys.RAFT_SERVER_AUTO_SNAPSHOT_ENABLED_KEY, + RaftServerConfigKeys.RAFT_SERVER_AUTO_SNAPSHOT_ENABLED_DEFAULT); + snapshotThreshold = properties.getLong( + RaftServerConfigKeys.RAFT_SERVER_SNAPSHOT_TRIGGER_THRESHOLD_KEY, + RaftServerConfigKeys.RAFT_SERVER_SNAPSHOT_TRIGGER_THRESHOLD_DEFAULT); + updater = new Daemon(this); + } + + void start() { + updater.start(); + } + + void stop() { + state = State.STOP; + updater.interrupt(); + try { + stateMachine.close(); + } catch (IOException ignored) { + } + } + + void reloadStateMachine() { + state = State.RELOAD; + notifyUpdater(); + } + + synchronized void notifyUpdater() { + notifyAll(); + } + + @Override + public String toString() { + return this.getClass().getSimpleName() + "-" + raftLog.getSelfId(); + } + + @Override + public void run() { + final RaftStorage storage = server.getState().getStorage(); + while (isRunning()) { + try { + synchronized (this) { + // when the peers just start, the committedIndex is initialized as 0 + // and will be updated only after the leader contacts other peers. + // Thus initially lastAppliedIndex can be greater than lastCommitted. + while (lastAppliedIndex >= raftLog.getLastCommittedIndex()) { + wait(); + } + } + + final long committedIndex = raftLog.getLastCommittedIndex(); + Preconditions.checkState(lastAppliedIndex < committedIndex); + + if (state == State.RELOAD) { + Preconditions.checkState(stateMachine.getLifeCycleState() == LifeCycle.State.PAUSED); + + stateMachine.reinitialize(server.getId(), properties, storage); + + SnapshotInfo snapshot = stateMachine.getLatestSnapshot(); + Preconditions.checkState(snapshot != null && snapshot.getIndex() > lastAppliedIndex, + "Snapshot: %s, lastAppliedIndex: %s", snapshot, lastAppliedIndex); + + lastAppliedIndex = snapshot.getIndex(); + lastSnapshotIndex = snapshot.getIndex(); + state = State.RUNNING; + } + + while (lastAppliedIndex < committedIndex) { + final LogEntryProto next = raftLog.get(lastAppliedIndex + 1); + if (next != null) { + if (next.getLogEntryBodyCase() == CONFIGURATIONENTRY) { + // the reply should have already been set. only need to record + // the new conf in the state machine. + stateMachine.setRaftConfiguration( + ServerProtoUtils.toRaftConfiguration(next.getIndex(), + next.getConfigurationEntry())); + } else if (next.getLogEntryBodyCase() == SMLOGENTRY) { + // check whether there is a TransactionContext because we are the leader. + TransactionContext trx = server.getTransactionContext(next.getIndex()); + if (trx == null) { + trx = new TransactionContext(stateMachine, next); + } + + // Let the StateMachine inject logic for committed transactions in sequential order. + trx = stateMachine.applyTransactionSerial(trx); + + // TODO: This step can be parallelized + CompletableFuture<Message> messageFuture = + stateMachine.applyTransaction(trx); + server.replyPendingRequest(next.getIndex(), messageFuture); + } + lastAppliedIndex++; + } else { + LOG.debug("{}: logEntry {} is null. There may be snapshot to load. state:{}", + this, lastAppliedIndex + 1, state); + break; + } + } + + // check if need to trigger a snapshot + if (shouldTakeSnapshot(lastAppliedIndex)) { + stateMachine.takeSnapshot(); + // TODO purge logs, including log cache. but should keep log for leader's RPCSenders + lastSnapshotIndex = lastAppliedIndex; + } + } catch (InterruptedException e) { + if (!isRunning()) { + LOG.info("{}: the StateMachineUpdater is interrupted and will exit.", this); + } else { + final String s = this + ": the StateMachineUpdater is wrongly interrupted"; + ExitUtils.terminate(1, s, e, LOG); + } + } catch (Throwable t) { + final String s = this + ": the StateMachineUpdater hits Throwable"; + ExitUtils.terminate(2, s, t, LOG); + } + } + } + + private boolean isRunning() { + return state != State.STOP; + } + + private boolean shouldTakeSnapshot(long currentAppliedIndex) { + return autoSnapshotEnabled && (state != State.RELOAD) && + (currentAppliedIndex - lastSnapshotIndex >= snapshotThreshold); + } + + long getLastAppliedIndex() { + return lastAppliedIndex; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/protocol/RaftServerProtocol.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/protocol/RaftServerProtocol.java b/ratis-server/src/main/java/org/apache/ratis/server/protocol/RaftServerProtocol.java new file mode 100644 index 0000000..f1f5512 --- /dev/null +++ b/ratis-server/src/main/java/org/apache/ratis/server/protocol/RaftServerProtocol.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.protocol; + +import java.io.IOException; + +import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto; +import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesRequestProto; +import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotReplyProto; +import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotRequestProto; +import org.apache.ratis.shaded.proto.RaftProtos.RequestVoteReplyProto; +import org.apache.ratis.shaded.proto.RaftProtos.RequestVoteRequestProto; + +public interface RaftServerProtocol { + + RequestVoteReplyProto requestVote(RequestVoteRequestProto request) throws IOException; + + AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto request) throws IOException; + + InstallSnapshotReplyProto installSnapshot(InstallSnapshotRequestProto request) throws IOException; +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/protocol/TermIndex.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/protocol/TermIndex.java b/ratis-server/src/main/java/org/apache/ratis/server/protocol/TermIndex.java new file mode 100644 index 0000000..665f5d5 --- /dev/null +++ b/ratis-server/src/main/java/org/apache/ratis/server/protocol/TermIndex.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.protocol; + +import org.apache.ratis.server.impl.ServerUtils; + +/** The term and the log index defined in the Raft consensus algorithm. */ +public interface TermIndex extends Comparable<TermIndex> { + /** @return the term. */ + long getTerm(); + + /** @return the index. */ + long getIndex(); + + /** Create a new {@link TermIndex} instance. */ + static TermIndex newTermIndex(long term, long index) { + return ServerUtils.newTermIndex(term, index); + } +} + + http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/storage/BufferedChannelBase.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/BufferedChannelBase.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/BufferedChannelBase.java new file mode 100644 index 0000000..0dfc6f0 --- /dev/null +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/BufferedChannelBase.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.storage; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.channels.FileChannel; + +public abstract class BufferedChannelBase implements Closeable { + protected final FileChannel fileChannel; + + protected BufferedChannelBase(FileChannel fc) { + this.fileChannel = fc; + } + + protected FileChannel validateAndGetFileChannel() throws IOException { + if (!fileChannel.isOpen()) { + throw new IOException( + "Attempting to access a file channel that has already been closed"); + } + return fileChannel; + } + + /** + * Get the current size of the underlying FileChannel. + */ + public long size() throws IOException { + return validateAndGetFileChannel().size(); + } + + /** + * Get the {@link FileChannel} that this BufferedChannel wraps around. + */ + public FileChannel getFileChannel() { + return fileChannel; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/storage/BufferedWriteChannel.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/BufferedWriteChannel.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/BufferedWriteChannel.java new file mode 100644 index 0000000..e61a4d3 --- /dev/null +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/BufferedWriteChannel.java @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.storage; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Provides a buffering layer in front of a FileChannel for writing. + */ +public class BufferedWriteChannel extends BufferedChannelBase { + // The capacity of the write buffer. + private final int writeCapacity; + // The position of the file channel's write pointer. + private AtomicLong writeBufferStartPosition = new AtomicLong(0); + // The buffer used to write operations. + private final ByteBuffer writeBuffer; + // The absolute position of the next write operation. + private volatile long position; + + public BufferedWriteChannel(FileChannel fc, int writeCapacity) + throws IOException { + super(fc); + this.writeCapacity = writeCapacity; + this.position = fc.position(); + this.writeBufferStartPosition.set(position); + this.writeBuffer = ByteBuffer.allocateDirect(writeCapacity); + } + + /** + * Write all the data in src to the {@link FileChannel}. Note that this function can + * buffer or re-order writes based on the implementation. These writes will be flushed + * to the disk only when flush() is invoked. + * + * @param src The source ByteBuffer which contains the data to be written. + * @throws IOException if a write operation fails. + */ + public void write(ByteBuffer src) throws IOException { + int copied = 0; + while (src.remaining() > 0) { + int truncated = 0; + if (writeBuffer.remaining() < src.remaining()) { + truncated = src.remaining() - writeBuffer.remaining(); + src.limit(src.limit() - truncated); + } + copied += src.remaining(); + writeBuffer.put(src); + src.limit(src.limit() + truncated); + // if we have run out of buffer space, we should flush to the file + if (writeBuffer.remaining() == 0) { + flushInternal(); + } + } + position += copied; + } + + /** + * Write the specified byte. + * @param b the byte to be written + */ + public void write(int b) throws IOException { + writeBuffer.put((byte) b); + if (writeBuffer.remaining() == 0) { + flushInternal(); + } + position++; + } + + public void write(byte[] b) throws IOException { + int offset = 0; + while (offset < b.length) { + int toPut = Math.min(b.length - offset, writeBuffer.remaining()); + writeBuffer.put(b, offset, toPut); + offset += toPut; + if (writeBuffer.remaining() == 0) { + flushInternal(); + } + } + position += b.length; + } + + /** + * Get the position where the next write operation will begin writing from. + */ + public long position() { + return position; + } + + /** + * Get the position of the file channel's write pointer. + */ + public long getFileChannelPosition() { + return writeBufferStartPosition.get(); + } + + + /** + * Write any data in the buffer to the file. If sync is set to true, force a + * sync operation so that data is persisted to the disk. + * + * @throws IOException if the write or sync operation fails. + */ + public void flush(boolean shouldForceWrite) throws IOException { + synchronized (this) { + flushInternal(); + } + if (shouldForceWrite) { + forceWrite(false); + } + } + + /** + * Write any data in the buffer to the file and advance the writeBufferPosition + * Callers are expected to synchronize appropriately + * + * @throws IOException if the write fails. + */ + private void flushInternal() throws IOException { + writeBuffer.flip(); + do { + fileChannel.write(writeBuffer); + } while (writeBuffer.hasRemaining()); + writeBuffer.clear(); + writeBufferStartPosition.set(fileChannel.position()); + } + + public long forceWrite(boolean forceMetadata) throws IOException { + // This is the point up to which we had flushed to the file system page cache + // before issuing this force write hence is guaranteed to be made durable by + // the force write, any flush that happens after this may or may + // not be flushed + long positionForceWrite = writeBufferStartPosition.get(); + fileChannel.force(forceMetadata); + return positionForceWrite; + } + + @Override + public void close() throws IOException { + fileChannel.close(); + writeBuffer.clear(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/storage/FileInfo.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/FileInfo.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/FileInfo.java new file mode 100644 index 0000000..9673596 --- /dev/null +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/FileInfo.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.storage; + +import java.nio.file.Path; + +import org.apache.ratis.io.MD5Hash; + +/** + * Metadata about a file. + * + * The objects of this class are immutable. + */ +public class FileInfo { + private final Path path; + private final MD5Hash fileDigest; + private final long fileSize; + + public FileInfo(Path path, MD5Hash fileDigest) { + this.path = path; + this.fileDigest = fileDigest; + this.fileSize = path.toFile().length(); + } + + @Override + public String toString() { + return path.toString(); + } + + /** @return the path of the file. */ + public Path getPath() { + return path; + } + + /** @return the MD5 file digest of the file. */ + public MD5Hash getFileDigest() { + return fileDigest; + } + + /** @return the size of the file. */ + public long getFileSize() { + return fileSize; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/storage/LogInputStream.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/LogInputStream.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/LogInputStream.java new file mode 100644 index 0000000..60572c6 --- /dev/null +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/LogInputStream.java @@ -0,0 +1,259 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.storage; + +import static org.apache.ratis.server.impl.RaftServerConstants.INVALID_LOG_INDEX; + +import java.io.Closeable; +import java.io.EOFException; +import java.io.File; +import java.io.IOException; + +import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; +import org.apache.ratis.util.RaftUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; + +public class LogInputStream implements Closeable { + static final Logger LOG = LoggerFactory.getLogger(LogInputStream.class); + + static class LogValidation { + private final long validLength; + private final long endIndex; + private final boolean hasCorruptHeader; + + LogValidation(long validLength, long endIndex, boolean hasCorruptHeader) { + this.validLength = validLength; + this.endIndex = endIndex; + this.hasCorruptHeader = hasCorruptHeader; + } + + long getValidLength() { + return validLength; + } + + long getEndIndex() { + return endIndex; + } + + boolean hasCorruptHeader() { + return hasCorruptHeader; + } + } + + private enum State { + UNINIT, + OPEN, + CLOSED + } + + private final File logFile; + private final long startIndex; + private final long endIndex; + private final boolean isOpen; + private State state = State.UNINIT; + private LogReader reader; + + public LogInputStream(File log, long startIndex, long endIndex, + boolean isOpen) { + if (isOpen) { + Preconditions.checkArgument(endIndex == INVALID_LOG_INDEX); + } else { + Preconditions.checkArgument(endIndex >= startIndex); + } + + this.logFile = log; + this.startIndex = startIndex; + this.endIndex = endIndex; + this.isOpen = isOpen; + } + + private void init() throws IOException { + Preconditions.checkState(state == State.UNINIT); + try { + reader = new LogReader(logFile); + // read the log header + String header = reader.readLogHeader(); + Preconditions.checkState(SegmentedRaftLog.HEADER_STR.equals(header), + "Corrupted log header: %s", header); + state = State.OPEN; + } finally { + if (reader == null) { + state = State.CLOSED; + } + } + } + + long getStartIndex() { + return startIndex; + } + + long getEndIndex() { + return endIndex; + } + + String getName() { + return logFile.getName(); + } + + public LogEntryProto nextEntry() throws IOException { + LogEntryProto entry = null; + switch (state) { + case UNINIT: + try { + init(); + } catch (Throwable e) { + LOG.error("caught exception initializing " + this, e); + Throwables.propagateIfPossible(e, IOException.class); + } + Preconditions.checkState(state != State.UNINIT); + return nextEntry(); + case OPEN: + entry = reader.readEntry(); + if (entry != null) { + long index = entry.getIndex(); + if (!isOpen() && index >= endIndex) { + /** + * The end index may be derived from the segment recovery + * process. It is possible that we still have some uncleaned garbage + * in the end. We should skip them. + */ + long skipAmt = logFile.length() - reader.getPos(); + if (skipAmt > 0) { + LOG.debug("skipping {} bytes at the end of log '{}': reached" + + " entry {} out of {}", skipAmt, getName(), index, endIndex); + reader.skipFully(skipAmt); + } + } + } + break; + case CLOSED: + break; // return null + } + return entry; + } + + long scanNextEntry() throws IOException { + Preconditions.checkState(state == State.OPEN); + return reader.scanEntry(); + } + + long getPosition() { + if (state == State.OPEN) { + return reader.getPos(); + } else { + return 0; + } + } + + @Override + public void close() throws IOException { + if (state == State.OPEN) { + reader.close(); + } + state = State.CLOSED; + } + + boolean isOpen() { + return isOpen; + } + + @Override + public String toString() { + return getName(); + } + + /** + * @param file File being scanned and validated. + * @param maxTxIdToScan Maximum Tx ID to try to scan. + * The scan returns after reading this or a higher + * ID. The file portion beyond this ID is + * potentially being updated. + * @return Result of the validation + * @throws IOException + */ + static LogValidation scanEditLog(File file, long maxTxIdToScan) + throws IOException { + LogInputStream in; + try { + in = new LogInputStream(file, INVALID_LOG_INDEX, INVALID_LOG_INDEX, false); + // read the header, initialize the inputstream + in.init(); + } catch (EOFException e) { + LOG.warn("Log file " + file + " has no valid header", e); + return new LogValidation(0, INVALID_LOG_INDEX, true); + } + + try { + return scanEditLog(in, maxTxIdToScan); + } finally { + RaftUtils.cleanup(LOG, in); + } + } + + /** + * Find the last valid entry index in the stream. + * If there are invalid or corrupt entries in the middle of the stream, + * scanEditLog will skip over them. + * + * This reads through the stream but does not close it. + * + * @param maxIndexToScan Maximum entry index to try to scan. The scan returns + * after reading this or a higher index. The file + * portion beyond this index is potentially being + * updated. + */ + static LogValidation scanEditLog(LogInputStream in, long maxIndexToScan) { + long lastPos = 0; + long end = INVALID_LOG_INDEX; + long numValid = 0; + boolean hitError = false; + while (end < maxIndexToScan) { + long index; + lastPos = in.getPosition(); + try { + if (hitError) { + LogEntryProto entry = in.nextEntry(); + index = entry != null ? entry.getIndex() : INVALID_LOG_INDEX; + LOG.warn("After resync, position is " + in.getPosition()); + } else { + index = in.scanNextEntry(); + } + if (index == INVALID_LOG_INDEX) { + break; + } else { + hitError = false; + } + } catch (Throwable t) { + LOG.warn("Caught exception after scanning through {} ops from {}" + + " while determining its valid length. Position was " + + lastPos, numValid, in, t); + hitError = true; + continue; + } + if (end == INVALID_LOG_INDEX || index > end) { + end = index; + } + numValid++; + } + return new LogValidation(lastPos, end, false); + } +}