Repository: incubator-ratis
Updated Branches:
  refs/heads/master 8fb860315 -> a0f19ceb2


RATIS-422: LogStateMachine refactoring

Signed-off-by: Josh Elser <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/a0f19ceb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/a0f19ceb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/a0f19ceb

Branch: refs/heads/master
Commit: a0f19ceb2a70644ce31ce6d92ef228bb10127b07
Parents: 8fb8603
Author: Vladimir Rodionov <[email protected]>
Authored: Thu Nov 15 13:26:49 2018 -0800
Committer: Josh Elser <[email protected]>
Committed: Thu Nov 15 14:26:23 2018 -0800

----------------------------------------------------------------------
 .../ratis/logservice/api/LogStateMachine.java   | 398 -------------------
 .../logservice/server/LogStateMachine.java      | 398 +++++++++++++++++++
 .../logservice/worker/LogServiceWorker.java     |   2 +-
 .../logservice/LogServiceReadWriteBase.java     |   2 +-
 4 files changed, 400 insertions(+), 400 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a0f19ceb/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStateMachine.java
----------------------------------------------------------------------
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStateMachine.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStateMachine.java
deleted file mode 100644
index c72e1fc..0000000
--- 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStateMachine.java
+++ /dev/null
@@ -1,398 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.logservice.api;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.apache.ratis.logservice.api.LogName;
-import org.apache.ratis.logservice.proto.LogServiceProtos.*;
-import org.apache.ratis.logservice.util.LogServiceProtoUtil;
-import org.apache.ratis.proto.RaftProtos;
-import org.apache.ratis.proto.RaftProtos.LogEntryProto;
-import org.apache.ratis.protocol.Message;
-import org.apache.ratis.protocol.RaftGroupId;
-import org.apache.ratis.server.RaftServer;
-import org.apache.ratis.server.impl.RaftServerConstants;
-import org.apache.ratis.server.impl.RaftServerProxy;
-import org.apache.ratis.server.impl.ServerState;
-import org.apache.ratis.server.protocol.TermIndex;
-import org.apache.ratis.server.storage.RaftLog;
-import org.apache.ratis.server.storage.RaftLogIOException;
-import org.apache.ratis.server.storage.RaftStorage;
-import org.apache.ratis.statemachine.StateMachineStorage;
-import org.apache.ratis.statemachine.TransactionContext;
-import org.apache.ratis.statemachine.impl.BaseStateMachine;
-import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
-import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo;
-import org.apache.ratis.util.AutoCloseableLock;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class LogStateMachine extends BaseStateMachine {
-  public static final Logger LOG = 
LoggerFactory.getLogger(LogStateMachine.class);
-
-  public static enum State {
-    OPEN, CLOSED
-  }
-
-  /*
-   *  State is a pair log's length and state (closed/open);
-   */
-
-  private long length;
-
-  private State state = State.OPEN;
-
-  private final SimpleStateMachineStorage storage = new 
SimpleStateMachineStorage();
-
-  private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
-
-  private RaftLog log;
-
-  private RaftGroupId groupId;
-
-  private RaftServerProxy proxy ;
-
-  private AutoCloseableLock readLock() {
-    return AutoCloseableLock.acquire(lock.readLock());
-  }
-
-  private AutoCloseableLock writeLock() {
-    return AutoCloseableLock.acquire(lock.writeLock());
-  }
-
-  /**
-   * Reset state machine
-   */
-  void reset() {
-    this.length = 0;
-    setLastAppliedTermIndex(null);
-  }
-
-  @Override
-  public void initialize(RaftServer server, RaftGroupId groupId,
-      RaftStorage raftStorage) throws IOException {
-    super.initialize(server, groupId, raftStorage);
-    this.storage.init(raftStorage);
-    this.proxy = (RaftServerProxy) server;
-    this.groupId = groupId;
-    loadSnapshot(storage.getLatestSnapshot());
-  }
-
-  private void checkInitialization() throws IOException {
-    if (this.log == null) {
-      ServerState state = proxy.getImpl(groupId).getState();
-      this.log = state.getLog();
-    }
-  }
-
-  @Override
-  public void reinitialize() throws IOException {
-    close();
-    loadSnapshot(storage.getLatestSnapshot());
-  }
-
-  @Override
-  public long takeSnapshot() {
-    final TermIndex last;
-    try(final AutoCloseableLock readLock = readLock()) {
-      last = getLastAppliedTermIndex();
-    }
-
-    final File snapshotFile =  storage.getSnapshotFile(last.getTerm(), 
last.getIndex());
-    LOG.info("Taking a snapshot to file {}", snapshotFile);
-
-    try(final AutoCloseableLock readLock = readLock();
-        final ObjectOutputStream out = new ObjectOutputStream(
-        new BufferedOutputStream(new FileOutputStream(snapshotFile)))) {
-      out.writeLong(length);
-      out.writeObject(state);
-    } catch(IOException ioe) {
-      LOG.warn("Failed to write snapshot file \"" + snapshotFile
-          + "\", last applied index=" + last);
-    }
-
-    return last.getIndex();
-  }
-
-  private long loadSnapshot(SingleFileSnapshotInfo snapshot) throws 
IOException {
-    return load(snapshot, false);
-  }
-
-  private long load(SingleFileSnapshotInfo snapshot, boolean reload) throws 
IOException {
-    if (snapshot == null) {
-      LOG.warn("The snapshot info is null.");
-      return RaftServerConstants.INVALID_LOG_INDEX;
-    }
-    final File snapshotFile = snapshot.getFile().getPath().toFile();
-    if (!snapshotFile.exists()) {
-      LOG.warn("The snapshot file {} does not exist for snapshot {}", 
snapshotFile, snapshot);
-      return RaftServerConstants.INVALID_LOG_INDEX;
-    }
-
-    final TermIndex last = 
SimpleStateMachineStorage.getTermIndexFromSnapshotFile(snapshotFile);
-    try(final AutoCloseableLock writeLock = writeLock();
-        final ObjectInputStream in = new ObjectInputStream(
-            new BufferedInputStream(new FileInputStream(snapshotFile)))) {
-      if (reload) {
-        reset();
-      }
-      setLastAppliedTermIndex(last);
-      this.length = in.readLong();
-      this.state = (State) in.readObject();
-    } catch (ClassNotFoundException e) {
-      throw new IllegalStateException(e);
-    }
-    return last.getIndex();
-  }
-
-  @Override
-  public StateMachineStorage getStateMachineStorage() {
-    return storage;
-  }
-
-  @Override
-  public CompletableFuture<Message> query(Message request) {
-
-    try {
-
-      checkInitialization();
-      LogServiceRequestProto logServiceRequestProto =
-          LogServiceRequestProto.parseFrom(request.getContent());
-
-      switch (logServiceRequestProto.getRequestCase()) {
-
-        case READNEXTQUERY:
-          return processReadRequest(logServiceRequestProto);
-        case LENGTHQUERY:
-          return processGetLengthRequest(logServiceRequestProto);
-        case STARTINDEXQUERY:
-          return processGetStartIndexRequest(logServiceRequestProto);
-        case GETSTATE:
-          return processGetStateRequest(logServiceRequestProto);
-        case LASTINDEXQUERY:
-          return processGetLastCommittedIndexRequest(logServiceRequestProto);
-        default:
-          // TODO
-          throw new RuntimeException(
-            "Wrong message type for query: " + 
logServiceRequestProto.getRequestCase());
-      }
-
-    } catch (IOException e) {
-      // TODO exception handling
-      throw new RuntimeException(e);
-    }
-
-  }
-
-  /**
-   * Process get start index request
-   * @param msg message
-   * @return reply message
-   */
-  private CompletableFuture<Message>
-      processGetStartIndexRequest(LogServiceRequestProto proto)
-  {
-
-    Throwable t = verifyState(State.OPEN);
-    long startIndex = log.getStartIndex();
-    return CompletableFuture.completedFuture(Message
-      .valueOf(LogServiceProtoUtil.toGetLogStartIndexReplyProto(startIndex, 
t).toByteString()));
-  }
-
-  /**
-   * Process get last committed record index
-   * @param msg message
-   * @return reply message
-   */
-  private CompletableFuture<Message>
-      processGetLastCommittedIndexRequest(LogServiceRequestProto proto)
-  {
-
-    Throwable t = verifyState(State.OPEN);
-    long lastIndex = log.getLastCommittedIndex();
-    return CompletableFuture.completedFuture(Message
-      .valueOf(LogServiceProtoUtil.toGetLogLastIndexReplyProto(lastIndex, 
t).toByteString()));
-  }
-
-  /**
-   * Process get length request
-   * @param msg message
-   * @return reply message
-   */
-  private CompletableFuture<Message> 
processGetLengthRequest(LogServiceRequestProto proto) {
-    GetLogLengthRequestProto msgProto = proto.getLengthQuery();
-    Throwable t = verifyState(State.OPEN);
-    LOG.debug("QUERY: {}, RESULT: {}", msgProto, this.length);
-    return CompletableFuture.completedFuture(Message
-      .valueOf(LogServiceProtoUtil.toGetLogLengthReplyProto(this.length, 
t).toByteString()));
-  }
-
-  /**
-   * Process read log entries request
-   * @param msg message
-   * @return reply message
-   */
-  private CompletableFuture<Message> processReadRequest(LogServiceRequestProto 
proto) {
-
-    ReadLogRequestProto msgProto = proto.getReadNextQuery();
-    long startRecordId = msgProto.getStartRecordId();
-    int num = msgProto.getNumRecords();
-    Throwable t = verifyState(State.OPEN);
-    List<byte[]> list = new ArrayList<byte[]>();
-    LOG.info("Start Index: {}", startRecordId);
-    LOG.info("Total to read: {}", num);
-    if (t == null) {
-      for (long index = startRecordId; index < startRecordId + num; index++) {
-        try {
-          LogEntryProto entry = log.get(index);
-          LOG.info("Index: {} Entry: {}", index, entry);
-          if (entry == null || entry.hasConfigurationEntry()) {
-            continue;
-          }
-          //TODO: how to distinguish log records from
-          // DML commands logged by the service?
-          list.add(entry.getStateMachineLogEntry().getLogData().toByteArray());
-        } catch (RaftLogIOException e) {
-          t = e;
-          list = null;
-          break;
-        }
-      }
-    }
-    return CompletableFuture.completedFuture(
-      Message.valueOf(LogServiceProtoUtil.toReadLogReplyProto(list, 
t).toByteString()));
-  }
-
-  /**
-   * Process sync request
-   * @param trx transaction
-   * @param logMessage message
-   * @return reply message
-   */
-  private CompletableFuture<Message> processSyncRequest(TransactionContext trx,
-      LogServiceRequestProto logMessage) {
-     long index = trx.getLogEntry().getIndex();
-    // TODO: Do we really need this call?
-    return CompletableFuture.completedFuture(Message
-      .valueOf(LogServiceProtoUtil.toSyncLogReplyProto(index, 
null).toByteString()));
-
-  }
-
-  private CompletableFuture<Message> processAppendRequest(TransactionContext 
trx,
-      LogServiceRequestProto logProto) {
-
-    final LogEntryProto entry = trx.getLogEntry();
-    AppendLogEntryRequestProto proto = logProto.getAppendRequest();
-    final long index = entry.getIndex();
-    long total = 0;
-    Throwable t = verifyState(State.OPEN);
-    if (t == null) {
-      try (final AutoCloseableLock writeLock = writeLock()) {
-          List<byte[]> entries = 
LogServiceProtoUtil.toListByteArray(proto.getDataList());
-          for (byte[] bb : entries) {
-            total += bb.length;
-          }
-          this.length += total;
-          // TODO do we need this for other write request (close, sync)
-          updateLastAppliedTermIndex(entry.getTerm(), index);
-      }
-    }
-    List<Long> ids = new ArrayList<Long>();
-    ids.add(index);
-    final CompletableFuture<Message> f =
-        CompletableFuture.completedFuture(
-          Message.valueOf(LogServiceProtoUtil.toAppendLogReplyProto(ids, 
t).toByteString()));
-    final RaftProtos.RaftPeerRole role = trx.getServerRole();
-    LOG.debug("{}:{}-{}: {} new length {}", role, getId(), index, proto, 
length);
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("{}-{}: variables={}", getId(), index, length);
-    }
-    return f;
-  }
-
-  @Override
-  public void close() {
-    reset();
-  }
-
-  @Override
-  public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
-    try {
-      checkInitialization();
-      final LogEntryProto entry = trx.getLogEntry();
-      LogServiceRequestProto logServiceRequestProto =
-          
LogServiceRequestProto.parseFrom(entry.getStateMachineLogEntry().getLogData());
-      switch (logServiceRequestProto.getRequestCase()) {
-        case CLOSELOG:
-          return processCloseLog(logServiceRequestProto);
-        case APPENDREQUEST:
-          return processAppendRequest(trx, logServiceRequestProto);
-        case SYNCREQUEST:
-          return processSyncRequest(trx, logServiceRequestProto);
-        default:
-          //TODO
-          return null;
-      }
-    } catch (IOException e) {
-      // TODO exception handling
-      throw new RuntimeException(e);
-    }
-  }
-
-
-
-  private CompletableFuture<Message> processCloseLog(LogServiceRequestProto 
logServiceRequestProto) {
-    CloseLogRequestProto closeLog = logServiceRequestProto.getCloseLog();
-    LogName logName = LogServiceProtoUtil.toLogName(closeLog.getLogName());
-    // Need to check whether the file is opened if opened close it.
-    // TODO need to handle exceptions while operating with files.
-    return CompletableFuture.completedFuture(Message
-      .valueOf(CloseLogReplyProto.newBuilder().build().toByteString()));
-  }
-
-
-
-  private CompletableFuture<Message> processGetStateRequest(
-      LogServiceRequestProto logServiceRequestProto) {
-    GetStateRequestProto getState = logServiceRequestProto.getGetState();
-    LogName logName = LogServiceProtoUtil.toLogName(getState.getLogName());
-    return 
CompletableFuture.completedFuture(Message.valueOf(LogServiceProtoUtil
-        .toGetStateReplyProto(state == State.OPEN).toByteString()));
-  }
-
-  private Throwable verifyState(State state) {
-       if (this.state != state) {
-          return new IOException("Wrong state: " + this.state);
-        }
-        return null;
-   }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a0f19ceb/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java
----------------------------------------------------------------------
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java
new file mode 100644
index 0000000..2ee5a53
--- /dev/null
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java
@@ -0,0 +1,398 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.logservice.server;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.ratis.logservice.api.LogName;
+import org.apache.ratis.logservice.proto.LogServiceProtos.*;
+import org.apache.ratis.logservice.util.LogServiceProtoUtil;
+import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.impl.RaftServerConstants;
+import org.apache.ratis.server.impl.RaftServerProxy;
+import org.apache.ratis.server.impl.ServerState;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.storage.RaftLog;
+import org.apache.ratis.server.storage.RaftLogIOException;
+import org.apache.ratis.server.storage.RaftStorage;
+import org.apache.ratis.statemachine.StateMachineStorage;
+import org.apache.ratis.statemachine.TransactionContext;
+import org.apache.ratis.statemachine.impl.BaseStateMachine;
+import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
+import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo;
+import org.apache.ratis.util.AutoCloseableLock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LogStateMachine extends BaseStateMachine {
+  public static final Logger LOG = 
LoggerFactory.getLogger(LogStateMachine.class);
+
+  public static enum State {
+    OPEN, CLOSED
+  }
+
+  /*
+   *  State is a pair log's length and state (closed/open);
+   */
+
+  private long length;
+
+  private State state = State.OPEN;
+
+  private final SimpleStateMachineStorage storage = new 
SimpleStateMachineStorage();
+
+  private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
+
+  private RaftLog log;
+
+  private RaftGroupId groupId;
+
+  private RaftServerProxy proxy ;
+
+  private AutoCloseableLock readLock() {
+    return AutoCloseableLock.acquire(lock.readLock());
+  }
+
+  private AutoCloseableLock writeLock() {
+    return AutoCloseableLock.acquire(lock.writeLock());
+  }
+
+  /**
+   * Reset state machine
+   */
+  void reset() {
+    this.length = 0;
+    setLastAppliedTermIndex(null);
+  }
+
+  @Override
+  public void initialize(RaftServer server, RaftGroupId groupId,
+      RaftStorage raftStorage) throws IOException {
+    super.initialize(server, groupId, raftStorage);
+    this.storage.init(raftStorage);
+    this.proxy = (RaftServerProxy) server;
+    this.groupId = groupId;
+    loadSnapshot(storage.getLatestSnapshot());
+  }
+
+  private void checkInitialization() throws IOException {
+    if (this.log == null) {
+      ServerState state = proxy.getImpl(groupId).getState();
+      this.log = state.getLog();
+    }
+  }
+
+  @Override
+  public void reinitialize() throws IOException {
+    close();
+    loadSnapshot(storage.getLatestSnapshot());
+  }
+
+  @Override
+  public long takeSnapshot() {
+    final TermIndex last;
+    try(final AutoCloseableLock readLock = readLock()) {
+      last = getLastAppliedTermIndex();
+    }
+
+    final File snapshotFile =  storage.getSnapshotFile(last.getTerm(), 
last.getIndex());
+    LOG.info("Taking a snapshot to file {}", snapshotFile);
+
+    try(final AutoCloseableLock readLock = readLock();
+        final ObjectOutputStream out = new ObjectOutputStream(
+        new BufferedOutputStream(new FileOutputStream(snapshotFile)))) {
+      out.writeLong(length);
+      out.writeObject(state);
+    } catch(IOException ioe) {
+      LOG.warn("Failed to write snapshot file \"" + snapshotFile
+          + "\", last applied index=" + last);
+    }
+
+    return last.getIndex();
+  }
+
+  private long loadSnapshot(SingleFileSnapshotInfo snapshot) throws 
IOException {
+    return load(snapshot, false);
+  }
+
+  private long load(SingleFileSnapshotInfo snapshot, boolean reload) throws 
IOException {
+    if (snapshot == null) {
+      LOG.warn("The snapshot info is null.");
+      return RaftServerConstants.INVALID_LOG_INDEX;
+    }
+    final File snapshotFile = snapshot.getFile().getPath().toFile();
+    if (!snapshotFile.exists()) {
+      LOG.warn("The snapshot file {} does not exist for snapshot {}", 
snapshotFile, snapshot);
+      return RaftServerConstants.INVALID_LOG_INDEX;
+    }
+
+    final TermIndex last = 
SimpleStateMachineStorage.getTermIndexFromSnapshotFile(snapshotFile);
+    try(final AutoCloseableLock writeLock = writeLock();
+        final ObjectInputStream in = new ObjectInputStream(
+            new BufferedInputStream(new FileInputStream(snapshotFile)))) {
+      if (reload) {
+        reset();
+      }
+      setLastAppliedTermIndex(last);
+      this.length = in.readLong();
+      this.state = (State) in.readObject();
+    } catch (ClassNotFoundException e) {
+      throw new IllegalStateException(e);
+    }
+    return last.getIndex();
+  }
+
+  @Override
+  public StateMachineStorage getStateMachineStorage() {
+    return storage;
+  }
+
+  @Override
+  public CompletableFuture<Message> query(Message request) {
+
+    try {
+
+      checkInitialization();
+      LogServiceRequestProto logServiceRequestProto =
+          LogServiceRequestProto.parseFrom(request.getContent());
+
+      switch (logServiceRequestProto.getRequestCase()) {
+
+        case READNEXTQUERY:
+          return processReadRequest(logServiceRequestProto);
+        case LENGTHQUERY:
+          return processGetLengthRequest(logServiceRequestProto);
+        case STARTINDEXQUERY:
+          return processGetStartIndexRequest(logServiceRequestProto);
+        case GETSTATE:
+          return processGetStateRequest(logServiceRequestProto);
+        case LASTINDEXQUERY:
+          return processGetLastCommittedIndexRequest(logServiceRequestProto);
+        default:
+          // TODO
+          throw new RuntimeException(
+            "Wrong message type for query: " + 
logServiceRequestProto.getRequestCase());
+      }
+
+    } catch (IOException e) {
+      // TODO exception handling
+      throw new RuntimeException(e);
+    }
+
+  }
+
+  /**
+   * Process get start index request
+   * @param msg message
+   * @return reply message
+   */
+  private CompletableFuture<Message>
+      processGetStartIndexRequest(LogServiceRequestProto proto)
+  {
+
+    Throwable t = verifyState(State.OPEN);
+    long startIndex = log.getStartIndex();
+    return CompletableFuture.completedFuture(Message
+      .valueOf(LogServiceProtoUtil.toGetLogStartIndexReplyProto(startIndex, 
t).toByteString()));
+  }
+
+  /**
+   * Process get last committed record index
+   * @param msg message
+   * @return reply message
+   */
+  private CompletableFuture<Message>
+      processGetLastCommittedIndexRequest(LogServiceRequestProto proto)
+  {
+
+    Throwable t = verifyState(State.OPEN);
+    long lastIndex = log.getLastCommittedIndex();
+    return CompletableFuture.completedFuture(Message
+      .valueOf(LogServiceProtoUtil.toGetLogLastIndexReplyProto(lastIndex, 
t).toByteString()));
+  }
+
+  /**
+   * Process get length request
+   * @param msg message
+   * @return reply message
+   */
+  private CompletableFuture<Message> 
processGetLengthRequest(LogServiceRequestProto proto) {
+    GetLogLengthRequestProto msgProto = proto.getLengthQuery();
+    Throwable t = verifyState(State.OPEN);
+    LOG.debug("QUERY: {}, RESULT: {}", msgProto, this.length);
+    return CompletableFuture.completedFuture(Message
+      .valueOf(LogServiceProtoUtil.toGetLogLengthReplyProto(this.length, 
t).toByteString()));
+  }
+
+  /**
+   * Process read log entries request
+   * @param msg message
+   * @return reply message
+   */
+  private CompletableFuture<Message> processReadRequest(LogServiceRequestProto 
proto) {
+
+    ReadLogRequestProto msgProto = proto.getReadNextQuery();
+    long startRecordId = msgProto.getStartRecordId();
+    int num = msgProto.getNumRecords();
+    Throwable t = verifyState(State.OPEN);
+    List<byte[]> list = new ArrayList<byte[]>();
+    LOG.info("Start Index: {}", startRecordId);
+    LOG.info("Total to read: {}", num);
+    if (t == null) {
+      for (long index = startRecordId; index < startRecordId + num; index++) {
+        try {
+          LogEntryProto entry = log.get(index);
+          LOG.info("Index: {} Entry: {}", index, entry);
+          if (entry == null || entry.hasConfigurationEntry()) {
+            continue;
+          }
+          //TODO: how to distinguish log records from
+          // DML commands logged by the service?
+          list.add(entry.getStateMachineLogEntry().getLogData().toByteArray());
+        } catch (RaftLogIOException e) {
+          t = e;
+          list = null;
+          break;
+        }
+      }
+    }
+    return CompletableFuture.completedFuture(
+      Message.valueOf(LogServiceProtoUtil.toReadLogReplyProto(list, 
t).toByteString()));
+  }
+
+  /**
+   * Process sync request
+   * @param trx transaction
+   * @param logMessage message
+   * @return reply message
+   */
+  private CompletableFuture<Message> processSyncRequest(TransactionContext trx,
+      LogServiceRequestProto logMessage) {
+     long index = trx.getLogEntry().getIndex();
+    // TODO: Do we really need this call?
+    return CompletableFuture.completedFuture(Message
+      .valueOf(LogServiceProtoUtil.toSyncLogReplyProto(index, 
null).toByteString()));
+
+  }
+
+  private CompletableFuture<Message> processAppendRequest(TransactionContext 
trx,
+      LogServiceRequestProto logProto) {
+
+    final LogEntryProto entry = trx.getLogEntry();
+    AppendLogEntryRequestProto proto = logProto.getAppendRequest();
+    final long index = entry.getIndex();
+    long total = 0;
+    Throwable t = verifyState(State.OPEN);
+    if (t == null) {
+      try (final AutoCloseableLock writeLock = writeLock()) {
+          List<byte[]> entries = 
LogServiceProtoUtil.toListByteArray(proto.getDataList());
+          for (byte[] bb : entries) {
+            total += bb.length;
+          }
+          this.length += total;
+          // TODO do we need this for other write request (close, sync)
+          updateLastAppliedTermIndex(entry.getTerm(), index);
+      }
+    }
+    List<Long> ids = new ArrayList<Long>();
+    ids.add(index);
+    final CompletableFuture<Message> f =
+        CompletableFuture.completedFuture(
+          Message.valueOf(LogServiceProtoUtil.toAppendLogReplyProto(ids, 
t).toByteString()));
+    final RaftProtos.RaftPeerRole role = trx.getServerRole();
+    LOG.debug("{}:{}-{}: {} new length {}", role, getId(), index, proto, 
length);
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("{}-{}: variables={}", getId(), index, length);
+    }
+    return f;
+  }
+
+  @Override
+  public void close() {
+    reset();
+  }
+
+  @Override
+  public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
+    try {
+      checkInitialization();
+      final LogEntryProto entry = trx.getLogEntry();
+      LogServiceRequestProto logServiceRequestProto =
+          
LogServiceRequestProto.parseFrom(entry.getStateMachineLogEntry().getLogData());
+      switch (logServiceRequestProto.getRequestCase()) {
+        case CLOSELOG:
+          return processCloseLog(logServiceRequestProto);
+        case APPENDREQUEST:
+          return processAppendRequest(trx, logServiceRequestProto);
+        case SYNCREQUEST:
+          return processSyncRequest(trx, logServiceRequestProto);
+        default:
+          //TODO
+          return null;
+      }
+    } catch (IOException e) {
+      // TODO exception handling
+      throw new RuntimeException(e);
+    }
+  }
+
+
+
+  private CompletableFuture<Message> processCloseLog(LogServiceRequestProto 
logServiceRequestProto) {
+    CloseLogRequestProto closeLog = logServiceRequestProto.getCloseLog();
+    LogName logName = LogServiceProtoUtil.toLogName(closeLog.getLogName());
+    // Need to check whether the file is opened if opened close it.
+    // TODO need to handle exceptions while operating with files.
+    return CompletableFuture.completedFuture(Message
+      .valueOf(CloseLogReplyProto.newBuilder().build().toByteString()));
+  }
+
+
+
+  private CompletableFuture<Message> processGetStateRequest(
+      LogServiceRequestProto logServiceRequestProto) {
+    GetStateRequestProto getState = logServiceRequestProto.getGetState();
+    LogName logName = LogServiceProtoUtil.toLogName(getState.getLogName());
+    return 
CompletableFuture.completedFuture(Message.valueOf(LogServiceProtoUtil
+        .toGetStateReplyProto(state == State.OPEN).toByteString()));
+  }
+
+  private Throwable verifyState(State state) {
+       if (this.state != state) {
+          return new IOException("Wrong state: " + this.state);
+        }
+        return null;
+   }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a0f19ceb/ratis-logservice/src/main/java/org/apache/ratis/logservice/worker/LogServiceWorker.java
----------------------------------------------------------------------
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/worker/LogServiceWorker.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/worker/LogServiceWorker.java
index a805448..e8fd895 100644
--- 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/worker/LogServiceWorker.java
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/worker/LogServiceWorker.java
@@ -23,7 +23,7 @@ import com.beust.jcommander.Parameter;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.grpc.GrpcConfigKeys;
-import org.apache.ratis.logservice.api.LogStateMachine;
+import org.apache.ratis.logservice.server.LogStateMachine;
 import org.apache.ratis.logservice.server.ManagementStateMachine;
 import org.apache.ratis.logservice.util.MetaServiceProtoUtil;
 import org.apache.ratis.logservice.util.LogServiceUtils;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a0f19ceb/ratis-logservice/src/test/java/org/apache/ratis/logservice/LogServiceReadWriteBase.java
----------------------------------------------------------------------
diff --git 
a/ratis-logservice/src/test/java/org/apache/ratis/logservice/LogServiceReadWriteBase.java
 
b/ratis-logservice/src/test/java/org/apache/ratis/logservice/LogServiceReadWriteBase.java
index 9585915..fd71311 100644
--- 
a/ratis-logservice/src/test/java/org/apache/ratis/logservice/LogServiceReadWriteBase.java
+++ 
b/ratis-logservice/src/test/java/org/apache/ratis/logservice/LogServiceReadWriteBase.java
@@ -32,9 +32,9 @@ import org.apache.ratis.logservice.api.LogName;
 import org.apache.ratis.logservice.api.LogReader;
 import org.apache.ratis.logservice.api.LogStream;
 import org.apache.ratis.logservice.api.LogStream.State;
-import org.apache.ratis.logservice.api.LogStateMachine;
 import org.apache.ratis.logservice.api.LogWriter;
 import org.apache.ratis.logservice.impl.LogStreamImpl;
+import org.apache.ratis.logservice.server.LogStateMachine;
 import org.apache.ratis.logservice.util.TestUtils;
 import org.apache.ratis.statemachine.StateMachine;
 import org.junit.After;

Reply via email to