joshelser commented on a change in pull request #25: RATIS-588 LogStream 
StateMachine export
URL: https://github.com/apache/incubator-ratis/pull/25#discussion_r299659998
 
 

 ##########
 File path: 
ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java
 ##########
 @@ -452,29 +510,194 @@ public void close() {
 
 
 
-  private CompletableFuture<Message> processCloseLog(LogServiceRequestProto 
logServiceRequestProto) {
-    CloseLogRequestProto closeLog = logServiceRequestProto.getCloseLog();
+  private CompletableFuture<Message> processChangeState(LogServiceRequestProto 
logServiceRequestProto) {
+    LogServiceProtos.ChangeStateLogRequestProto changeState = 
logServiceRequestProto.getChangeState();
     // Need to check whether the file is opened if opened close it.
     // TODO need to handle exceptions while operating with files.
+
+    State targetState = State.valueOf(changeState.getState().name());
+    switch (targetState) {
+    case OPEN:
+      if (state != null) {
+        verifyState(State.OPEN, State.CLOSED);
+      }
+      break;
+    case CLOSED:
+      verifyState(State.OPEN);
+      break;
+    case ARCHIVED:
+      verifyState(State.ARCHIVING);
+      break;
+    case ARCHIVING:
+      verifyState(State.CLOSED);
+      break;
+    case DELETED:
+      verifyState(State.CLOSED);
+      break;
+    }
+    this.state = targetState;
     return CompletableFuture.completedFuture(Message
-      .valueOf(CloseLogReplyProto.newBuilder().build().toByteString()));
+        
.valueOf(LogServiceProtos.ChangeStateReplyProto.newBuilder().build().toByteString()));
   }
 
-
-
   private CompletableFuture<Message> processGetStateRequest(
       LogServiceRequestProto logServiceRequestProto) {
     GetStateRequestProto getState = logServiceRequestProto.getGetState();
-    return 
CompletableFuture.completedFuture(Message.valueOf(LogServiceProtoUtil
-        .toGetStateReplyProto(state == State.OPEN).toByteString()));
+    return CompletableFuture.completedFuture(Message
+        
.valueOf(LogServiceProtoUtil.toGetStateReplyProto(state).toByteString()));
   }
 
-  private Throwable verifyState(State state) {
-       if (this.state != state) {
-          return new IOException("Wrong state: " + this.state);
-        }
+  private Throwable verifyState(State... states) {
+    for (State state : states) {
+      if (this.state == state) {
         return null;
-   }
+      }
+    }
+    return new IOException("Wrong state: " + this.state);
+  }
+
+  private CompletableFuture<Message> updateArchiveLogInfo(
+      LogServiceRequestProto logServiceRequestProto) {
+    LogServiceProtos.ArchiveLogRequestProto archiveLog = 
logServiceRequestProto.getArchiveLog();
+    this.archiveLogName = 
LogServiceProtoUtil.toLogName(archiveLog.getLogName());
+    this.archiveLocation = archiveLog.getLocation();
+    this.lastArchivedIndex = archiveLog.getLastArchivedRaftIndex();
+    Throwable t = verifyState(State.ARCHIVING);
+    return CompletableFuture.completedFuture(
+        
Message.valueOf(LogServiceProtoUtil.toArchiveLogReplyProto(t).toByteString()));
+  }
+
+  private CompletableFuture<Message> processArchiveLog(
+      LogServiceRequestProto logServiceRequestProto) {
+    LogServiceProtos.ArchiveLogRequestProto archiveLog = 
logServiceRequestProto.getArchiveLog();
+    LogName logName = LogServiceProtoUtil.toLogName(archiveLog.getLogName());
+    String loc = null;
+    this.isArchival = !archiveLog.getIsExport();
+    if(isArchival) {
+      loc = archiveLocation;
+    }else{
+      loc = archiveLog.getLocation();
+    }
+    Throwable t = null;
+    if (loc == null) {
+      t = new IllegalArgumentException(isArchival ?
+          "Location for archive is not configured" :
+          "Location for export provided is null");
+      return CompletableFuture.completedFuture(
+          
Message.valueOf(LogServiceProtoUtil.toArchiveLogReplyProto(t).toByteString()));
+    }
+    final String location = loc;
+    String archiveLocationForLog = 
LogServiceUtils.getArchiveLocationForLog(location, logName);
+    long recordId = archiveLog.getLastArchivedRaftIndex();
+    t = verifyState(State.CLOSED);
+    if (t == null) {
+      Callable<Boolean> callable = () -> {
+        sendChangeStateRequest(State.ARCHIVING);
+        updateArchivingInfo(recordId, logName, location, isArchival);
+        ArchiveLogWriter writer = new ArchiveHdfsLogWriter();
+        writer.init(archiveLocationForLog, logName);
+        LogServiceRaftLogReader reader = new LogServiceRaftLogReader(log);
+        reader.seek(0);
 
 Review comment:
   Should be `recordId` and not `0`?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to