ankitsinghal commented on a change in pull request #25: RATIS-588 LogStream 
StateMachine export
URL: https://github.com/apache/incubator-ratis/pull/25#discussion_r295040095
 
 

 ##########
 File path: 
ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java
 ##########
 @@ -452,29 +503,166 @@ public void close() {
 
 
 
-  private CompletableFuture<Message> processCloseLog(LogServiceRequestProto 
logServiceRequestProto) {
-    CloseLogRequestProto closeLog = logServiceRequestProto.getCloseLog();
+  private CompletableFuture<Message> processChangeState(LogServiceRequestProto 
logServiceRequestProto) {
+    LogServiceProtos.ChangeStateLogRequestProto changeState = 
logServiceRequestProto.getChangeState();
     // Need to check whether the file is opened if opened close it.
     // TODO need to handle exceptions while operating with files.
+    Throwable t = verifyState(State.OPEN);
+    this.state= State.valueOf(changeState.getState().name());
     return CompletableFuture.completedFuture(Message
-      .valueOf(CloseLogReplyProto.newBuilder().build().toByteString()));
+      
.valueOf(LogServiceProtos.ChangeStateReplyProto.newBuilder().build().toByteString()));
   }
 
-
-
   private CompletableFuture<Message> processGetStateRequest(
       LogServiceRequestProto logServiceRequestProto) {
     GetStateRequestProto getState = logServiceRequestProto.getGetState();
-    return 
CompletableFuture.completedFuture(Message.valueOf(LogServiceProtoUtil
-        .toGetStateReplyProto(state == State.OPEN).toByteString()));
+    return CompletableFuture.completedFuture(Message
+        
.valueOf(LogServiceProtoUtil.toGetStateReplyProto(state).toByteString()));
   }
 
-  private Throwable verifyState(State state) {
-       if (this.state != state) {
-          return new IOException("Wrong state: " + this.state);
-        }
+  private Throwable verifyState(State... states) {
+    for (State state : states) {
+      if (this.state == state) {
         return null;
-   }
+      }
+    }
+    return new IOException("Wrong state: " + this.state);
+  }
 
+  private CompletableFuture<Message> updateArchiveLogInfo(
+      LogServiceRequestProto logServiceRequestProto) {
+    LogServiceProtos.ArchiveLogRequestProto archiveLog = 
logServiceRequestProto.getArchiveLog();
+    this.archiveLogName = 
LogServiceProtoUtil.toLogName(archiveLog.getLogName());
+    this.archiveLocation = archiveLog.getLocation();
+    this.lastArchivedIndex = archiveLog.getLastArchivedRaftIndex();
+    Throwable t = verifyState(State.ARCHIVING);
+    return CompletableFuture.completedFuture(
+        
Message.valueOf(LogServiceProtoUtil.toArchiveLogReplyProto(t).toByteString()));
+  }
+
+  private CompletableFuture<Message> processArchiveLog(
+      LogServiceRequestProto logServiceRequestProto) {
+    LogServiceProtos.ArchiveLogRequestProto archiveLog = 
logServiceRequestProto.getArchiveLog();
+    LogName logName = LogServiceProtoUtil.toLogName(archiveLog.getLogName());
+    String loc = null;
+    this.isArchival = !archiveLog.getIsExport();
+    if(isArchival) {
+      loc = archiveLocation;
+    }else{
+      loc = archiveLog.getLocation();
+    }
+    if (loc == null) {
+      throw new IllegalArgumentException(isArchival ?
+          "Location for archive is not configured" :
+          "Location for export provided is null");
+    }
+    final String location = loc;
+    String archiveLocationForLog = 
LogServiceUtils.getArchiveLocationForLog(location, logName);
+    long recordId = archiveLog.getLastArchivedRaftIndex();
+    try {
+      Throwable t = verifyState(State.CLOSED);
+      if (t == null) {
+        Callable<Boolean> callable = () -> {
+            sendChangeStateRequest(State.ARCHIVING);
+            updateArchivingInfo(recordId, logName, location, isArchival);
+            ArchiveLogWriter writer = new ArchiveHdfsLogWriter();
+            writer.init(archiveLocationForLog,logName);
+            LogServiceRaftLogReader reader = new LogServiceRaftLogReader(log);
+            reader.seek(0);
+            long records = 0;
+            while (reader.hasNext()) {
+              writer.write(ByteBuffer.wrap(reader.next()));
+              if (records >= DEFAULT_ARCHIVE_THRESHOLD_PER_FILE || 
isNoMoreLeader) {
+                commit(writer, logName, location);
+                if (isNoMoreLeader) {
+                  break;
+                }
+                records = 0;
+              }
+              records++;
+            }
+            writer.close();
+            if (!isNoMoreLeader) {
+              sendChangeStateRequest(State.ARCHIVED);
+            } else {
+              
sendArchiveLogrequestToNewLeader(writer.getLastWrittenRecordId(), logName, 
location);
+            }
+            return true;
+          };
+        archiveFuture = executorService.submit(callable);
+      }
+      return CompletableFuture.completedFuture(
+          
Message.valueOf(LogServiceProtoUtil.toArchiveLogReplyProto(t).toByteString()));
+    } catch (Throwable e) {
+      return CompletableFuture.completedFuture(
+          
Message.valueOf(LogServiceProtoUtil.toArchiveLogReplyProto(e).toByteString()));
+    }
+  }
+
+  private void sendArchiveLogrequestToNewLeader(long recordId, LogName 
logName, String location)
+      throws IOException {
+    getClient().sendReadOnly(
+        () -> LogServiceProtoUtil.toArchiveLogRequestProto(logName, location, 
recordId, isArchival)
+            .toByteString());
+  }
+
+
+  private void commit(ArchiveLogWriter writer, LogName logName, String 
location)
+      throws IOException {
+    writer.rollWriter();
+    updateArchivingInfo(writer.getLastWrittenRecordId(), logName, location, 
isArchival);
+  }
+
+  private void updateArchivingInfo(long recordId, LogName logName, String 
location,
+      boolean isArchival)
+      throws IOException {
+    RaftClientReply archiveLogReply = getClient().send(
+        () -> LogServiceProtoUtil.toArchiveLogRequestProto(logName, location, 
recordId, isArchival)
+            .toByteString());
+    LogServiceProtos.ArchiveLogReplyProto 
message=LogServiceProtos.ArchiveLogReplyProto
+        .parseFrom(archiveLogReply.getMessage().getContent());
+    if (message.hasException()) {
+      throw new IOException(message.getException().getErrorMsg());
+    }
+  }
+
+  private void sendChangeStateRequest(State state) throws IOException {
+    if (isArchival) {
+      getClient().send(
+          () -> 
LogServiceProtoUtil.toChangeStateRequestProto(LogName.of("Dummy"), state)
+              .toByteString());
+    }
+
+  }
+
+  private RaftClient getClient() throws IOException {
+    if (client == null) {
+      try {
+        RaftServer raftServer = server.get();
+        client = 
RaftClient.newBuilder().setRaftGroup(getGroupFromGroupId(raftServer, groupId))
+            .setClientId(ClientId.randomId())
+            .setProperties(raftServer.getProperties()).build();
+      } catch (Exception e) {
+        throw new IOException(e);
+      }
+    }
+    return client;
+  }
+
+  private RaftGroup getGroupFromGroupId(RaftServer raftServer, RaftGroupId 
raftGroupId)
+      throws IOException {
+    List<RaftGroup> x = 
StreamSupport.stream(raftServer.getGroups().spliterator(), false)
+        .filter(group -> 
group.getGroupId().equals(raftGroupId)).collect(Collectors.toList());
+    if (x.size() == 1) {
+      return x.get(0);
+    } else {
+      throw new GroupMismatchException(x.size() + " are group found for group 
id:" + raftGroupId);
+    }
+  }
+
+  @Override public void notifyNotLeader(Collection<TransactionContext> 
pendingEntries)
+      throws IOException {
+    isNoMoreLeader = true;
 
 Review comment:
   As it is a read only request(so it may not require to maintain 
TransactionContext), I can simply interrupt the current thread in Future 
instead relying on a boolean variable.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to