ankitsinghal commented on a change in pull request #25: RATIS-588 LogStream
StateMachine export
URL: https://github.com/apache/incubator-ratis/pull/25#discussion_r295030627
##########
File path:
ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java
##########
@@ -452,29 +503,166 @@ public void close() {
- private CompletableFuture<Message> processCloseLog(LogServiceRequestProto
logServiceRequestProto) {
- CloseLogRequestProto closeLog = logServiceRequestProto.getCloseLog();
+ private CompletableFuture<Message> processChangeState(LogServiceRequestProto
logServiceRequestProto) {
+ LogServiceProtos.ChangeStateLogRequestProto changeState =
logServiceRequestProto.getChangeState();
// Need to check whether the file is opened if opened close it.
// TODO need to handle exceptions while operating with files.
+ Throwable t = verifyState(State.OPEN);
+ this.state= State.valueOf(changeState.getState().name());
return CompletableFuture.completedFuture(Message
- .valueOf(CloseLogReplyProto.newBuilder().build().toByteString()));
+
.valueOf(LogServiceProtos.ChangeStateReplyProto.newBuilder().build().toByteString()));
}
-
-
private CompletableFuture<Message> processGetStateRequest(
LogServiceRequestProto logServiceRequestProto) {
GetStateRequestProto getState = logServiceRequestProto.getGetState();
- return
CompletableFuture.completedFuture(Message.valueOf(LogServiceProtoUtil
- .toGetStateReplyProto(state == State.OPEN).toByteString()));
+ return CompletableFuture.completedFuture(Message
+
.valueOf(LogServiceProtoUtil.toGetStateReplyProto(state).toByteString()));
}
- private Throwable verifyState(State state) {
- if (this.state != state) {
- return new IOException("Wrong state: " + this.state);
- }
+ private Throwable verifyState(State... states) {
+ for (State state : states) {
+ if (this.state == state) {
return null;
- }
+ }
+ }
+ return new IOException("Wrong state: " + this.state);
+ }
+ private CompletableFuture<Message> updateArchiveLogInfo(
+ LogServiceRequestProto logServiceRequestProto) {
+ LogServiceProtos.ArchiveLogRequestProto archiveLog =
logServiceRequestProto.getArchiveLog();
+ this.archiveLogName =
LogServiceProtoUtil.toLogName(archiveLog.getLogName());
+ this.archiveLocation = archiveLog.getLocation();
+ this.lastArchivedIndex = archiveLog.getLastArchivedRaftIndex();
+ Throwable t = verifyState(State.ARCHIVING);
+ return CompletableFuture.completedFuture(
+
Message.valueOf(LogServiceProtoUtil.toArchiveLogReplyProto(t).toByteString()));
+ }
+
+ private CompletableFuture<Message> processArchiveLog(
+ LogServiceRequestProto logServiceRequestProto) {
+ LogServiceProtos.ArchiveLogRequestProto archiveLog =
logServiceRequestProto.getArchiveLog();
+ LogName logName = LogServiceProtoUtil.toLogName(archiveLog.getLogName());
+ String loc = null;
+ this.isArchival = !archiveLog.getIsExport();
+ if(isArchival) {
+ loc = archiveLocation;
+ }else{
+ loc = archiveLog.getLocation();
+ }
+ if (loc == null) {
+ throw new IllegalArgumentException(isArchival ?
+ "Location for archive is not configured" :
+ "Location for export provided is null");
+ }
+ final String location = loc;
+ String archiveLocationForLog =
LogServiceUtils.getArchiveLocationForLog(location, logName);
+ long recordId = archiveLog.getLastArchivedRaftIndex();
+ try {
+ Throwable t = verifyState(State.CLOSED);
+ if (t == null) {
+ Callable<Boolean> callable = () -> {
+ sendChangeStateRequest(State.ARCHIVING);
+ updateArchivingInfo(recordId, logName, location, isArchival);
+ ArchiveLogWriter writer = new ArchiveHdfsLogWriter();
+ writer.init(archiveLocationForLog,logName);
+ LogServiceRaftLogReader reader = new LogServiceRaftLogReader(log);
+ reader.seek(0);
+ long records = 0;
+ while (reader.hasNext()) {
+ writer.write(ByteBuffer.wrap(reader.next()));
+ if (records >= DEFAULT_ARCHIVE_THRESHOLD_PER_FILE ||
isNoMoreLeader) {
+ commit(writer, logName, location);
+ if (isNoMoreLeader) {
+ break;
+ }
+ records = 0;
+ }
+ records++;
+ }
+ writer.close();
+ if (!isNoMoreLeader) {
+ sendChangeStateRequest(State.ARCHIVED);
+ } else {
+
sendArchiveLogrequestToNewLeader(writer.getLastWrittenRecordId(), logName,
location);
Review comment:
Agreed.
bq. Would having the metadata quorum drive this archiving logic be a way to
try to prevent that? Maybe there are more problems in trying to do that which
I'm not thinking about?
I would suggest keeping such things with log quorum itself for scalabilty,
probably a ArchivingMonitoring thread at the leader will help in this. Let me
keep the code for now(adding just a delay in sending the request to avoid
short-lived leader to start archiving), but will change once I implement the
archiving transition when the leader dies.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services