joshelser commented on a change in pull request #25: RATIS-588 LogStream
StateMachine export
URL: https://github.com/apache/incubator-ratis/pull/25#discussion_r294460521
##########
File path:
ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java
##########
@@ -452,29 +503,166 @@ public void close() {
- private CompletableFuture<Message> processCloseLog(LogServiceRequestProto
logServiceRequestProto) {
- CloseLogRequestProto closeLog = logServiceRequestProto.getCloseLog();
+ private CompletableFuture<Message> processChangeState(LogServiceRequestProto
logServiceRequestProto) {
+ LogServiceProtos.ChangeStateLogRequestProto changeState =
logServiceRequestProto.getChangeState();
// Need to check whether the file is opened if opened close it.
// TODO need to handle exceptions while operating with files.
+ Throwable t = verifyState(State.OPEN);
+ this.state= State.valueOf(changeState.getState().name());
return CompletableFuture.completedFuture(Message
- .valueOf(CloseLogReplyProto.newBuilder().build().toByteString()));
+
.valueOf(LogServiceProtos.ChangeStateReplyProto.newBuilder().build().toByteString()));
}
-
-
private CompletableFuture<Message> processGetStateRequest(
LogServiceRequestProto logServiceRequestProto) {
GetStateRequestProto getState = logServiceRequestProto.getGetState();
- return
CompletableFuture.completedFuture(Message.valueOf(LogServiceProtoUtil
- .toGetStateReplyProto(state == State.OPEN).toByteString()));
+ return CompletableFuture.completedFuture(Message
+
.valueOf(LogServiceProtoUtil.toGetStateReplyProto(state).toByteString()));
}
- private Throwable verifyState(State state) {
- if (this.state != state) {
- return new IOException("Wrong state: " + this.state);
- }
+ private Throwable verifyState(State... states) {
+ for (State state : states) {
+ if (this.state == state) {
return null;
- }
+ }
+ }
+ return new IOException("Wrong state: " + this.state);
+ }
+ private CompletableFuture<Message> updateArchiveLogInfo(
+ LogServiceRequestProto logServiceRequestProto) {
+ LogServiceProtos.ArchiveLogRequestProto archiveLog =
logServiceRequestProto.getArchiveLog();
+ this.archiveLogName =
LogServiceProtoUtil.toLogName(archiveLog.getLogName());
+ this.archiveLocation = archiveLog.getLocation();
+ this.lastArchivedIndex = archiveLog.getLastArchivedRaftIndex();
+ Throwable t = verifyState(State.ARCHIVING);
+ return CompletableFuture.completedFuture(
+
Message.valueOf(LogServiceProtoUtil.toArchiveLogReplyProto(t).toByteString()));
+ }
+
+ private CompletableFuture<Message> processArchiveLog(
+ LogServiceRequestProto logServiceRequestProto) {
+ LogServiceProtos.ArchiveLogRequestProto archiveLog =
logServiceRequestProto.getArchiveLog();
+ LogName logName = LogServiceProtoUtil.toLogName(archiveLog.getLogName());
+ String loc = null;
+ this.isArchival = !archiveLog.getIsExport();
+ if(isArchival) {
+ loc = archiveLocation;
+ }else{
+ loc = archiveLog.getLocation();
+ }
+ if (loc == null) {
+ throw new IllegalArgumentException(isArchival ?
+ "Location for archive is not configured" :
+ "Location for export provided is null");
+ }
+ final String location = loc;
+ String archiveLocationForLog =
LogServiceUtils.getArchiveLocationForLog(location, logName);
+ long recordId = archiveLog.getLastArchivedRaftIndex();
+ try {
+ Throwable t = verifyState(State.CLOSED);
+ if (t == null) {
+ Callable<Boolean> callable = () -> {
+ sendChangeStateRequest(State.ARCHIVING);
+ updateArchivingInfo(recordId, logName, location, isArchival);
+ ArchiveLogWriter writer = new ArchiveHdfsLogWriter();
+ writer.init(archiveLocationForLog,logName);
+ LogServiceRaftLogReader reader = new LogServiceRaftLogReader(log);
+ reader.seek(0);
+ long records = 0;
+ while (reader.hasNext()) {
+ writer.write(ByteBuffer.wrap(reader.next()));
+ if (records >= DEFAULT_ARCHIVE_THRESHOLD_PER_FILE ||
isNoMoreLeader) {
+ commit(writer, logName, location);
+ if (isNoMoreLeader) {
+ break;
+ }
+ records = 0;
+ }
+ records++;
+ }
+ writer.close();
+ if (!isNoMoreLeader) {
+ sendChangeStateRequest(State.ARCHIVED);
+ } else {
+
sendArchiveLogrequestToNewLeader(writer.getLastWrittenRecordId(), logName,
location);
Review comment:
In the case that the active leader of a LogServer quorum bounces around a
lot, we're going to be "chaining" query() RPCs, aren't we? While we wait on
computation of archive in "this" instance, we might trigger another archive in
other instance, which triggers another archive in a third instance, etc. I'd
worry about this cascading through the cluster in a leader election storm.
Would having the metadata quorum drive this archiving logic be a way to try
to prevent that? Maybe there are more problems in trying to do that which I'm
not thinking about?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services