joshelser commented on a change in pull request #25: RATIS-588 LogStream
StateMachine export
URL: https://github.com/apache/incubator-ratis/pull/25#discussion_r294463031
##########
File path:
ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java
##########
@@ -452,29 +503,166 @@ public void close() {
- private CompletableFuture<Message> processCloseLog(LogServiceRequestProto
logServiceRequestProto) {
- CloseLogRequestProto closeLog = logServiceRequestProto.getCloseLog();
+ private CompletableFuture<Message> processChangeState(LogServiceRequestProto
logServiceRequestProto) {
+ LogServiceProtos.ChangeStateLogRequestProto changeState =
logServiceRequestProto.getChangeState();
// Need to check whether the file is opened if opened close it.
// TODO need to handle exceptions while operating with files.
+ Throwable t = verifyState(State.OPEN);
+ this.state= State.valueOf(changeState.getState().name());
return CompletableFuture.completedFuture(Message
- .valueOf(CloseLogReplyProto.newBuilder().build().toByteString()));
+
.valueOf(LogServiceProtos.ChangeStateReplyProto.newBuilder().build().toByteString()));
}
-
-
private CompletableFuture<Message> processGetStateRequest(
LogServiceRequestProto logServiceRequestProto) {
GetStateRequestProto getState = logServiceRequestProto.getGetState();
- return
CompletableFuture.completedFuture(Message.valueOf(LogServiceProtoUtil
- .toGetStateReplyProto(state == State.OPEN).toByteString()));
+ return CompletableFuture.completedFuture(Message
+
.valueOf(LogServiceProtoUtil.toGetStateReplyProto(state).toByteString()));
}
- private Throwable verifyState(State state) {
- if (this.state != state) {
- return new IOException("Wrong state: " + this.state);
- }
+ private Throwable verifyState(State... states) {
+ for (State state : states) {
+ if (this.state == state) {
return null;
- }
+ }
+ }
+ return new IOException("Wrong state: " + this.state);
+ }
+ private CompletableFuture<Message> updateArchiveLogInfo(
+ LogServiceRequestProto logServiceRequestProto) {
+ LogServiceProtos.ArchiveLogRequestProto archiveLog =
logServiceRequestProto.getArchiveLog();
+ this.archiveLogName =
LogServiceProtoUtil.toLogName(archiveLog.getLogName());
+ this.archiveLocation = archiveLog.getLocation();
+ this.lastArchivedIndex = archiveLog.getLastArchivedRaftIndex();
+ Throwable t = verifyState(State.ARCHIVING);
+ return CompletableFuture.completedFuture(
+
Message.valueOf(LogServiceProtoUtil.toArchiveLogReplyProto(t).toByteString()));
+ }
+
+ private CompletableFuture<Message> processArchiveLog(
+ LogServiceRequestProto logServiceRequestProto) {
+ LogServiceProtos.ArchiveLogRequestProto archiveLog =
logServiceRequestProto.getArchiveLog();
+ LogName logName = LogServiceProtoUtil.toLogName(archiveLog.getLogName());
+ String loc = null;
+ this.isArchival = !archiveLog.getIsExport();
+ if(isArchival) {
+ loc = archiveLocation;
+ }else{
+ loc = archiveLog.getLocation();
+ }
+ if (loc == null) {
+ throw new IllegalArgumentException(isArchival ?
+ "Location for archive is not configured" :
+ "Location for export provided is null");
+ }
+ final String location = loc;
+ String archiveLocationForLog =
LogServiceUtils.getArchiveLocationForLog(location, logName);
+ long recordId = archiveLog.getLastArchivedRaftIndex();
+ try {
+ Throwable t = verifyState(State.CLOSED);
+ if (t == null) {
+ Callable<Boolean> callable = () -> {
+ sendChangeStateRequest(State.ARCHIVING);
+ updateArchivingInfo(recordId, logName, location, isArchival);
+ ArchiveLogWriter writer = new ArchiveHdfsLogWriter();
+ writer.init(archiveLocationForLog,logName);
+ LogServiceRaftLogReader reader = new LogServiceRaftLogReader(log);
+ reader.seek(0);
+ long records = 0;
+ while (reader.hasNext()) {
+ writer.write(ByteBuffer.wrap(reader.next()));
+ if (records >= DEFAULT_ARCHIVE_THRESHOLD_PER_FILE ||
isNoMoreLeader) {
+ commit(writer, logName, location);
+ if (isNoMoreLeader) {
+ break;
+ }
+ records = 0;
+ }
+ records++;
+ }
+ writer.close();
+ if (!isNoMoreLeader) {
+ sendChangeStateRequest(State.ARCHIVED);
+ } else {
+
sendArchiveLogrequestToNewLeader(writer.getLastWrittenRecordId(), logName,
location);
+ }
+ return true;
+ };
+ archiveFuture = executorService.submit(callable);
+ }
+ return CompletableFuture.completedFuture(
+
Message.valueOf(LogServiceProtoUtil.toArchiveLogReplyProto(t).toByteString()));
+ } catch (Throwable e) {
+ return CompletableFuture.completedFuture(
+
Message.valueOf(LogServiceProtoUtil.toArchiveLogReplyProto(e).toByteString()));
+ }
+ }
+
+ private void sendArchiveLogrequestToNewLeader(long recordId, LogName
logName, String location)
+ throws IOException {
+ getClient().sendReadOnly(
+ () -> LogServiceProtoUtil.toArchiveLogRequestProto(logName, location,
recordId, isArchival)
+ .toByteString());
+ }
+
+
+ private void commit(ArchiveLogWriter writer, LogName logName, String
location)
+ throws IOException {
+ writer.rollWriter();
+ updateArchivingInfo(writer.getLastWrittenRecordId(), logName, location,
isArchival);
+ }
+
+ private void updateArchivingInfo(long recordId, LogName logName, String
location,
+ boolean isArchival)
+ throws IOException {
+ RaftClientReply archiveLogReply = getClient().send(
+ () -> LogServiceProtoUtil.toArchiveLogRequestProto(logName, location,
recordId, isArchival)
+ .toByteString());
+ LogServiceProtos.ArchiveLogReplyProto
message=LogServiceProtos.ArchiveLogReplyProto
+ .parseFrom(archiveLogReply.getMessage().getContent());
+ if (message.hasException()) {
+ throw new IOException(message.getException().getErrorMsg());
+ }
+ }
+
+ private void sendChangeStateRequest(State state) throws IOException {
+ if (isArchival) {
+ getClient().send(
+ () ->
LogServiceProtoUtil.toChangeStateRequestProto(LogName.of("Dummy"), state)
+ .toByteString());
+ }
+
+ }
+
+ private RaftClient getClient() throws IOException {
+ if (client == null) {
+ try {
+ RaftServer raftServer = server.get();
+ client =
RaftClient.newBuilder().setRaftGroup(getGroupFromGroupId(raftServer, groupId))
+ .setClientId(ClientId.randomId())
+ .setProperties(raftServer.getProperties()).build();
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+ return client;
+ }
+
+ private RaftGroup getGroupFromGroupId(RaftServer raftServer, RaftGroupId
raftGroupId)
+ throws IOException {
+ List<RaftGroup> x =
StreamSupport.stream(raftServer.getGroups().spliterator(), false)
+ .filter(group ->
group.getGroupId().equals(raftGroupId)).collect(Collectors.toList());
+ if (x.size() == 1) {
+ return x.get(0);
+ } else {
+ throw new GroupMismatchException(x.size() + " are group found for group
id:" + raftGroupId);
+ }
+ }
+
+ @Override public void notifyNotLeader(Collection<TransactionContext>
pendingEntries)
+ throws IOException {
+ isNoMoreLeader = true;
Review comment:
Is it possible to "interrupt" the transactions which are pending? Could we
keep a `Map<TransactionContext,Callable>` where the callable is doing the
archiving? That might be more clean?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services