RATIS-274. Read/Write-path of log stream state machine

Signed-off-by: Josh Elser <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/48d6a2a4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/48d6a2a4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/48d6a2a4

Branch: refs/heads/master
Commit: 48d6a2a421a8b44f4080bad0ef4c004cbf7b456d
Parents: 9774c5c
Author: Vladimir Rodionov <[email protected]>
Authored: Wed Oct 17 15:33:06 2018 -0700
Committer: Josh Elser <[email protected]>
Committed: Thu Oct 18 14:43:50 2018 -0400

----------------------------------------------------------------------
 .../ratis/logservice/LogServiceFactory.java     |   8 +-
 .../apache/ratis/logservice/api/LogMessage.java | 139 +--------
 .../apache/ratis/logservice/api/LogService.java |  23 +-
 .../logservice/api/LogServiceConfiguration.java |  88 ++++++
 .../ratis/logservice/api/LogStateMachine.java   | 293 ++++++++++++++-----
 .../apache/ratis/logservice/api/LogStream.java  |  28 +-
 .../logservice/api/LogStreamConfiguration.java  |  64 ----
 .../ratis/logservice/dummy/DummyLogService.java |  24 +-
 .../ratis/logservice/dummy/DummyLogStream.java  |  29 +-
 .../ratis/logservice/impl/BaseLogStream.java    |  85 ------
 .../ratis/logservice/impl/LogReaderImpl.java    | 155 ++++++++++
 .../ratis/logservice/impl/LogServiceImpl.java   |  57 ++--
 .../ratis/logservice/impl/LogStreamImpl.java    | 159 ++++++++++
 .../ratis/logservice/impl/LogWriterImpl.java    |  97 ++++++
 .../logservice/util/LogServiceProtoUtil.java    | 218 ++++++++++++--
 .../org/apache/ratis/logservice/util/Utils.java |  48 ---
 .../ratis/logservice/LogServiceBaseTest.java    |   7 +-
 .../ratis/logservice/api/TestApiExample.java    |   4 +-
 .../ratis/logservice/api/TestLogMessage.java    |  55 ----
 .../util/TestLogServiceProtoUtil.java           | 290 ++++++++++++++++++
 .../apache/ratis/logservice/util/TestUtils.java |  84 +-----
 ratis-proto/src/main/proto/Logservice.proto     |  97 ++++--
 .../ratis/server/impl/RaftServerProxy.java      |   2 +-
 23 files changed, 1438 insertions(+), 616 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/48d6a2a4/ratis-logservice/src/main/java/org/apache/ratis/logservice/LogServiceFactory.java
----------------------------------------------------------------------
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/LogServiceFactory.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/LogServiceFactory.java
index 084188e..1b9966a 100644
--- 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/LogServiceFactory.java
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/LogServiceFactory.java
@@ -19,9 +19,13 @@ package org.apache.ratis.logservice;
 
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.logservice.api.LogService;
+import org.apache.ratis.logservice.api.LogServiceConfiguration;
 import org.apache.ratis.logservice.impl.LogServiceImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class LogServiceFactory {
+  public static final Logger LOG = 
LoggerFactory.getLogger(LogServiceFactory.class);
   private static final LogServiceFactory INSTANCE = new LogServiceFactory();
 
   private LogServiceFactory() {}
@@ -31,8 +35,8 @@ public class LogServiceFactory {
    *
    * @param raftClient The client to a Raft quorum.
    */
-  public LogService createLogService(RaftClient raftClient) {
-    return new LogServiceImpl(raftClient);
+  public LogService createLogService(RaftClient raftClient, 
LogServiceConfiguration config) {
+    return new LogServiceImpl(raftClient, config);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/48d6a2a4/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogMessage.java
----------------------------------------------------------------------
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogMessage.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogMessage.java
index cfc53a5..3d3bb56 100644
--- 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogMessage.java
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogMessage.java
@@ -17,159 +17,34 @@
  */
 package org.apache.ratis.logservice.api;
 
-import java.nio.charset.Charset;
-
 import org.apache.ratis.protocol.Message;
-import org.apache.ratis.proto.logservice.LogServiceProtos;
 
-import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
-import 
org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
+public abstract class LogMessage implements Message {
 
-public class LogMessage implements Message {
-  public static final Charset UTF8 = Charset.forName("UTF-8");
   /*
-   * Type of message
+   * Type of messages
    */
   public static enum Type {
-    READ_REQUEST, READ_REPLY, WRITE
+    APPEND, CLOSE, SYNC, GET_START_INDEX, READ, GET_LENGTH
   }
 
   /*
-   * For all READ and WRITE requests
-   */
-  private final LogName name;
-
-  /*
-   * Set only for READ_REPLY response
-   */
-  private long  length;
-
-  /*
-   * Pay load for WRITE request
-   */
-  private byte[] data;
-
-  /*
-   * Type of message
-   */
-
-  private Type type;
-
-  /**
-   * Constructor for WRITE request
-   * @param logName name of a log
-   * @param data  pay load data
-   */
-  public LogMessage(LogName logName, byte[] data) {
-    this.name = logName;
-    this.data = data;
-    this.type = Type.WRITE;
-  }
-
-  /**
-   * Constructor for READ reply
-   * @param logName name of a log
-   * @param length length of a log
-   */
-  public LogMessage(LogName logName, long length) {
-    this.name = logName;
-    this.length = length;
-    this.type = Type.READ_REPLY;
-  }
-
-  /**
-   * Constructor for READ request
-   * @param logName name of a log
+   * Log name
    */
-  public LogMessage(LogName logName) {
-    this.name = logName;
-    this.type = Type.READ_REQUEST;
-  }
-
-  public static LogMessage parseFrom(ByteString data)
-      throws InvalidProtocolBufferException {
-    LogServiceProtos.LogMessage msg = 
LogServiceProtos.LogMessage.parseFrom(data);
-    LogServiceProtos.MessageType type = msg.getType();
-    long length = 0;
-    LogName name = null;
-    byte[] bdata = null;
-    name = LogName.of(msg.getLogName());
-    switch (type) {
-      case READ_REPLY:
-        length = msg.getLength();
-        return new LogMessage(name, length);
-      case READ_REQUEST:
-        return new LogMessage(name);
-      case WRITE:
-        bdata = msg.getData().toByteArray();
-        return new LogMessage(name, bdata);
-      default:
-        //TODO replace exception
-        throw new RuntimeException("Wrong message type: "+ type);
-    }
-  }
-
+  protected LogName logName;
 
   /**
    * Get log name
    * @return log name
    */
   public LogName getLogName() {
-    return name;
-  }
-
-  /**
-   * Get log length
-   * @return log length
-   */
-  public long getLength() {
-    return length;
-  }
-
-  /**
-   * Get log message data
-   * @return data
-   */
-  public byte[] getData() {
-    return data;
+    return logName;
   }
 
   /**
    * Get message type
    * @return message type
    */
-  public Type getType() {
-    return type;
-  }
+  public abstract Type getType();
 
-
-  @SuppressWarnings("deprecation")
-  @Override
-  public ByteString getContent() {
-    LogServiceProtos.LogMessage.Builder builder = 
LogServiceProtos.LogMessage.newBuilder();
-    builder.setType(LogServiceProtos.MessageType.valueOf(type.ordinal()));
-    builder.setLogName(name.getName());
-    switch (type) {
-      case READ_REPLY:
-        builder.setLength(length);
-        break;
-      case WRITE:
-        builder.setData(ByteString.copyFrom(data));
-        break;
-      default:
-    }
-
-    return builder.build().toByteString();
-  }
-
-  @Override
-  public String toString() {
-    String s = type.name() + " : log=" + name.getName();
-    if (type == Type.READ_REPLY) {
-      s += " len=" + length;
-    } else if (type == Type.WRITE) {
-      s += " data len=" + data.length;
-    }
-    return s;
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/48d6a2a4/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogService.java
----------------------------------------------------------------------
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogService.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogService.java
index c78ed2a..de47c62 100644
--- 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogService.java
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogService.java
@@ -20,6 +20,7 @@ package org.apache.ratis.logservice.api;
 import java.io.IOException;
 import java.util.Iterator;
 
+import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.logservice.api.LogStream.State;
 
 /**
@@ -46,7 +47,7 @@ public interface LogService extends AutoCloseable {
    * @param name Unique name for this LogStream.
    * @param config Configuration object for this LogStream
    */
-  LogStream createLog(LogName name, LogStreamConfiguration config) throws 
IOException;
+  LogStream createLog(LogName name, LogServiceConfiguration config) throws 
IOException;
 
   /*
    * How to get LogStreams that already exist
@@ -107,7 +108,7 @@ public interface LogService extends AutoCloseable {
    *
    * @param config The new configuration object
    */
-  void updateConfiguration(LogName name, LogStreamConfiguration config);
+  void updateConfiguration(LogName name, LogServiceConfiguration config);
 
   /**
    * Registers a {@link RecordListener} with the log which will receive all 
records written using
@@ -129,11 +130,27 @@ public interface LogService extends AutoCloseable {
    *
    * @param the log's name
    * @param listener The listener to remove
+   * @return
    */
-  void removeRecordListener(LogName name, RecordListener listener);
+  boolean removeRecordListener(LogName name, RecordListener listener);
 
   /**
    * Overrides {@link #close()} in {@link AutoCloseable} to throw an 
IOException.
    */
+  @Override
   void close() throws IOException;
+
+  /**
+   * Gets Raft client object
+   * @return raft client object
+   */
+  RaftClient getRaftClient();
+
+  /**
+   * Gets configuration
+   * @return configuration
+   */
+  LogServiceConfiguration getConfiguration();
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/48d6a2a4/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogServiceConfiguration.java
----------------------------------------------------------------------
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogServiceConfiguration.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogServiceConfiguration.java
new file mode 100644
index 0000000..78119e5
--- /dev/null
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogServiceConfiguration.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.logservice.api;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * An encapsulation of configuration for a LogService.
+ */
+public class LogServiceConfiguration {
+
+  private ConcurrentHashMap<String, String> configMap =
+      new ConcurrentHashMap<String, String>();
+
+  /**
+   * Ctor
+   */
+  public LogServiceConfiguration() {
+  }
+
+  /**
+   * Fetches the value for the given key from the configuration. If there is 
no entry for
+   * the given key, {@code null} is returned.
+   *
+   * @param key The configuration key
+   */
+  public String get(String key) {
+    return configMap.get(key);
+  }
+
+  /**
+   * Sets the given key and value into this configuration. The configuration 
key may
+   * not be null. A null value removes the key from the configuration.
+   *
+   * @param key Configuration key, must be non-null
+   * @param value Configuration value
+   */
+  public void set(String key, String value) {
+    configMap.put(key,  value);
+  }
+
+  /**
+   * Removes any entry with the given key from the configuration. If there is 
no entry
+   * for the given key, this method returns without error. The provided key 
must be
+   * non-null.
+   *
+   * @param key The configuration key, must be non-null
+   * @return value
+   */
+  public String remove(String key) {
+    return configMap.remove(key);
+  }
+
+  /**
+   * Sets the collection of key-value pairs into the configuration. This is 
functionally
+   * equivalent to calling {@link #set(String, String)} numerous time.
+   */
+  public void setMany(Iterable<Entry<String,String>> entries) {
+    for (Entry<String, String> entry: entries ) {
+      configMap.put(entry.getKey(), entry.getValue());
+    }
+  }
+
+  /**
+   * Returns an immutable view over the configuration as a {@code Map}.
+   */
+  public Map<String,String> asMap() {
+    return Collections.unmodifiableMap(configMap);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/48d6a2a4/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStateMachine.java
----------------------------------------------------------------------
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStateMachine.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStateMachine.java
index 4bfa294..1a3edc2 100644
--- 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStateMachine.java
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStateMachine.java
@@ -17,17 +17,28 @@
  */
 package org.apache.ratis.logservice.api;
 
-import org.apache.ratis.logservice.api.LogStream.State;
-import org.apache.ratis.logservice.impl.BaseLogStream;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.ratis.logservice.impl.LogStreamImpl;
 import org.apache.ratis.logservice.util.LogServiceProtoUtil;
-import org.apache.ratis.protocol.Message;
-import org.apache.ratis.protocol.RaftGroupId;
-import org.apache.ratis.server.RaftServer;
-import org.apache.ratis.server.impl.RaftServerConstants;
-import org.apache.ratis.server.protocol.TermIndex;
-import org.apache.ratis.server.storage.RaftStorage;
 import org.apache.ratis.proto.RaftProtos;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import 
org.apache.ratis.proto.logservice.LogServiceProtos.AppendLogEntryRequestProto;
 import org.apache.ratis.proto.logservice.LogServiceProtos.ArchiveLogReplyProto;
 import 
org.apache.ratis.proto.logservice.LogServiceProtos.ArchiveLogRequestProto;
 import org.apache.ratis.proto.logservice.LogServiceProtos.CloseLogReplyProto;
@@ -35,35 +46,48 @@ import 
org.apache.ratis.proto.logservice.LogServiceProtos.CloseLogRequestProto;
 import 
org.apache.ratis.proto.logservice.LogServiceProtos.CreateLogRequestProto;
 import org.apache.ratis.proto.logservice.LogServiceProtos.DeleteLogReplyProto;
 import 
org.apache.ratis.proto.logservice.LogServiceProtos.DeleteLogRequestProto;
+import 
org.apache.ratis.proto.logservice.LogServiceProtos.GetLogLengthRequestProto;
 import org.apache.ratis.proto.logservice.LogServiceProtos.GetLogReplyProto;
 import org.apache.ratis.proto.logservice.LogServiceProtos.GetLogRequestProto;
 import org.apache.ratis.proto.logservice.LogServiceProtos.GetStateRequestProto;
 import 
org.apache.ratis.proto.logservice.LogServiceProtos.LogServiceRequestProto;
+import org.apache.ratis.proto.logservice.LogServiceProtos.ReadLogRequestProto;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.impl.RaftServerConstants;
+import org.apache.ratis.server.impl.RaftServerProxy;
+import org.apache.ratis.server.impl.ServerState;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.storage.RaftLog;
+import org.apache.ratis.server.storage.RaftLogIOException;
+import org.apache.ratis.server.storage.RaftStorage;
 import org.apache.ratis.statemachine.StateMachineStorage;
 import org.apache.ratis.statemachine.TransactionContext;
 import org.apache.ratis.statemachine.impl.BaseStateMachine;
 import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
 import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo;
 import org.apache.ratis.util.AutoCloseableLock;
-import 
org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
-
-import java.io.*;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class LogStateMachine extends BaseStateMachine {
+  public static final Logger LOG = 
LoggerFactory.getLogger(LogStateMachine.class);
+  /*
+   *  State
+   */
   private final Map<LogName, Long> state = new ConcurrentHashMap<>();
 
   private final SimpleStateMachineStorage storage = new 
SimpleStateMachineStorage();
 
   private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
 
+  private RaftLog log;
+
+  private RaftGroupId groupId;
+
+  private RaftServerProxy proxy ;
+
   private AutoCloseableLock readLock() {
     return AutoCloseableLock.acquire(lock.readLock());
   }
@@ -72,6 +96,9 @@ public class LogStateMachine extends BaseStateMachine {
     return AutoCloseableLock.acquire(lock.writeLock());
   }
 
+  /**
+   * Reset state machine
+   */
   void reset() {
     state.clear();
     setLastAppliedTermIndex(null);
@@ -82,9 +109,18 @@ public class LogStateMachine extends BaseStateMachine {
       RaftStorage raftStorage) throws IOException {
     super.initialize(server, groupId, raftStorage);
     this.storage.init(raftStorage);
+    this.proxy = (RaftServerProxy) server;
+    this.groupId = groupId;
     loadSnapshot(storage.getLatestSnapshot());
   }
 
+  private void checkInitialization() throws IOException {
+    if (this.log == null) {
+      ServerState state = proxy.getImpl(groupId).getState();
+      this.log = state.getLog();
+    }
+  }
+
   @Override
   public void reinitialize() throws IOException {
     close();
@@ -152,26 +188,147 @@ public class LogStateMachine extends BaseStateMachine {
 
   @Override
   public CompletableFuture<Message> query(Message request) {
-    LogMessage msg = null;
+
     try {
-      msg = LogMessage.parseFrom(request.getContent());
-      LogName logName = msg.getLogName();
-      Long len = null;
-      try(final AutoCloseableLock readLock = readLock()) {
-        len = state.get(logName);
-        if (len == null) {
-          len = new Long(-1);
-        }
+
+      checkInitialization();
+      LogServiceRequestProto logServiceRequestProto =
+          LogServiceRequestProto.parseFrom(request.getContent());
+
+      switch (logServiceRequestProto.getRequestCase()) {
+
+        case READNEXTQUERY:
+          return processReadRequest(logServiceRequestProto);
+        case LENGTHQUERY:
+          return processGetLengthRequest(logServiceRequestProto);
+        case STARTINDEXQUERY:
+          return processGetStartIndexRequest(logServiceRequestProto);
+        case LISTLOGS:
+          return processListLogsRequest();
+        case GETLOG:
+          return processGetLogRequest(logServiceRequestProto);
+        case GETSTATE:
+          return processGetStateRequest(logServiceRequestProto);
+        default:
+          // TODO
+          throw new RuntimeException(
+            "Wrong message type for query: " + 
logServiceRequestProto.getRequestCase());
       }
-      LOG.debug("QUERY: {}, RESULT: {}", msg, len);
-      return CompletableFuture.completedFuture(new LogMessage (logName, len));
-    } catch (InvalidProtocolBufferException e) {
-      //TODO exception handling
+
+    } catch (IOException e) {
+      // TODO exception handling
       throw new RuntimeException(e);
     }
 
   }
 
+  /**
+   * Process get start index request
+   * @param msg message
+   * @return reply message
+   */
+  private CompletableFuture<Message>
+      processGetStartIndexRequest(LogServiceRequestProto proto)
+  {
+    long startIndex =log.getStartIndex();
+    return CompletableFuture.completedFuture(Message
+      .valueOf(LogServiceProtoUtil.toGetLogStartIndexReplyProto(startIndex, 
null).toByteString()));
+  }
+
+  /**
+   * Process get length request
+   * @param msg message
+   * @return reply message
+   */
+  private CompletableFuture<Message> 
processGetLengthRequest(LogServiceRequestProto proto) {
+    Long len = null;
+    GetLogLengthRequestProto msgProto = proto.getLengthQuery();
+    LogName logName = LogName.of(msgProto.getLogName().getName());
+    try(final AutoCloseableLock readLock = readLock()) {
+      len = state.get(logName);
+      if (len == null) {
+        len = new Long(-1);
+      }
+    }
+    LOG.debug("QUERY: {}, RESULT: {}", msgProto, len);
+    return CompletableFuture.completedFuture(Message
+      .valueOf(LogServiceProtoUtil.toGetLogLengthReplyProto(len, 
null).toByteString()));
+  }
+
+  /**
+   * Process read log entries request
+   * @param msg message
+   * @return reply message
+   */
+  private CompletableFuture<Message> processReadRequest(LogServiceRequestProto 
proto) {
+
+    ReadLogRequestProto msgProto = proto.getReadNextQuery();
+    long startRecordId = msgProto.getStartRecordId();
+    int num = msgProto.getNumRecords();
+    Throwable t = null;
+    List<byte[]> list = new ArrayList<byte[]>();
+    for (long index = startRecordId; index < startRecordId + num; index++) {
+      try {
+        
list.add(log.getEntryWithData(index).getEntry().getStateMachineLogEntry().getLogData().toByteArray());
+      } catch(RaftLogIOException e) {
+        t = e;
+        list = null;
+        break;
+      }
+    }
+    return CompletableFuture.completedFuture(Message
+      .valueOf(LogServiceProtoUtil.toReadLogReplyProto(list, 
t).toByteString()));
+  }
+
+  /**
+   * Process sync request
+   * @param trx transaction
+   * @param logMessage message
+   * @return reply message
+   */
+  private CompletableFuture<Message> processSyncRequest(TransactionContext trx,
+      LogServiceRequestProto logMessage) {
+
+    // TODO
+    return CompletableFuture.completedFuture(Message
+      .valueOf(LogServiceProtoUtil.toSyncLogReplyProto(null).toByteString()));
+
+  }
+
+  private CompletableFuture<Message> processAppendRequest(TransactionContext 
trx,
+      LogServiceRequestProto logProto) {
+
+    final LogEntryProto entry = trx.getLogEntry();
+    AppendLogEntryRequestProto proto = logProto.getAppendRequest();
+    final long index = entry.getIndex();
+    Long val = null;
+    LogName name = null;
+    long total = 0;
+    try (final AutoCloseableLock writeLock = writeLock()) {
+      name = LogServiceProtoUtil.toLogName(proto.getLogName());
+      List<byte[]> entries = 
LogServiceProtoUtil.toListByteArray(proto.getDataList());
+      for (byte[] bb : entries) {
+        total += bb.length;
+      }
+      val = state.get(name);
+      if (val == null) {
+        val = new Long(0);
+      }
+      state.put(name, val + total);
+      // TODO do we need this for other write request (close, sync)
+      updateLastAppliedTermIndex(entry.getTerm(), index);
+    }
+    final CompletableFuture<Message> f =
+        // TODO record ids?
+        CompletableFuture.completedFuture(Message
+          .valueOf(LogServiceProtoUtil.toAppendLogReplyProto(null, 
null).toByteString()));
+    final RaftProtos.RaftPeerRole role = trx.getServerRole();
+    LOG.debug("{}:{}-{}: {} new length {}", role, getId(), index, proto, val);
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("{}-{}: variables={}", getId(), index, state);
+    }
+    return f;
+  }
 
   @Override
   public void close() {
@@ -181,54 +338,29 @@ public class LogStateMachine extends BaseStateMachine {
   @Override
   public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
     try {
+      checkInitialization();
       final LogEntryProto entry = trx.getLogEntry();
       LogServiceRequestProto logServiceRequestProto =
           
LogServiceRequestProto.parseFrom(entry.getStateMachineLogEntry().getLogData());
-      CompletableFuture<Message> f = null;
       switch (logServiceRequestProto.getRequestCase()) {
-       case LOGMESSAGE:
-        org.apache.ratis.proto.logservice.LogServiceProtos.LogMessage 
logMessage2 = logServiceRequestProto.getLogMessage();
-        final LogMessage logMessage = 
LogMessage.parseFrom((entry.getStateMachineLogEntry().getLogData()));
-
-        final long index = entry.getIndex();
-        Long val = null;
-        LogName name = null;
-        try (final AutoCloseableLock writeLock = writeLock()) {
-          name = logMessage.getLogName();
-          long dataLength = logMessage.getData().length;
-          val = state.get(name);
-          if (val == null) {
-            val = new Long(0);
-          }
-          state.put(name, val + dataLength);
-          updateLastAppliedTermIndex(entry.getTerm(), index);
-        }
-        f =
-            CompletableFuture.completedFuture(new LogMessage(name, val));
-        final RaftProtos.RaftPeerRole role = trx.getServerRole();
-        LOG.debug("{}:{}-{}: {} new length {}", role, getId(), index, 
logMessage, val);
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("{}-{}: variables={}", getId(), index, state);
-        }
-        return f;
-      case CREATELOG:
-        return processCreateLogRequest(logServiceRequestProto);
-      case LISTLOGS:
-        return processListLogsRequest();
-      case GETLOG:
-        return processGetLogRequest(logServiceRequestProto);
-      case GETSTATE:
-        return processGetStateRequest(logServiceRequestProto);
-      case ARCHIVELOG:
-        return processArchiveLog(logServiceRequestProto);
-      case CLOSELOG:
-        return processCloseLog(logServiceRequestProto);
-      case DELETELOG:
-        return processDeleteLog(logServiceRequestProto);
-      default:
-        return null;
+
+        case CREATELOG:
+          return processCreateLogRequest(logServiceRequestProto);
+        case ARCHIVELOG:
+          return processArchiveLog(logServiceRequestProto);
+        case CLOSELOG:
+          return processCloseLog(logServiceRequestProto);
+        case DELETELOG:
+          return processDeleteLog(logServiceRequestProto);
+        case APPENDREQUEST:
+          return processAppendRequest(trx, logServiceRequestProto);
+        case SYNCREQUEST:
+          return processSyncRequest(trx, logServiceRequestProto);
+        default:
+          //TODO
+          return null;
       }
-    } catch (InvalidProtocolBufferException e) {
+    } catch (IOException e) {
       // TODO exception handling
       throw new RuntimeException(e);
     }
@@ -285,26 +417,31 @@ public class LogStateMachine extends BaseStateMachine {
       }
       state.put(name, val);
     }
+    //TODO This can't be part of a state machine (REMOVE)
     return 
CompletableFuture.completedFuture(Message.valueOf(LogServiceProtoUtil
-        .toCreateLogReplyProto(new BaseLogStream(name, State.OPEN, 
val)).toByteString()));
+        .toCreateLogReplyProto(
+          new LogStreamImpl(name, null, new 
LogServiceConfiguration())).toByteString()));
   }
 
+  //TODO REMOVE this code
   private CompletableFuture<Message> processListLogsRequest() {
     List<LogStream> logStreams = new ArrayList<LogStream>(state.size());
     for (Entry<LogName, Long> e : state.entrySet()) {
-      logStreams.add(new BaseLogStream(e.getKey(), State.OPEN, e.getValue()));
+      logStreams.add(new LogStreamImpl(e.getKey(), null, new 
LogServiceConfiguration()));
     }
     return 
CompletableFuture.completedFuture(Message.valueOf(LogServiceProtoUtil
         .toListLogLogsReplyProto(logStreams).toByteString()));
   }
 
+  //TODO REMOVE this code
+
   private CompletableFuture<Message> processGetLogRequest(
       LogServiceRequestProto logServiceRequestProto) {
     GetLogRequestProto getLog = logServiceRequestProto.getGetLog();
     LogName logName = LogServiceProtoUtil.toLogName(getLog.getLogName());
     if (state.containsKey(logName)) {
       return 
CompletableFuture.completedFuture(Message.valueOf(LogServiceProtoUtil
-          .toGetLogReplyProto(new BaseLogStream(logName, State.OPEN, 
state.get(logName)))
+          .toGetLogReplyProto(new LogStreamImpl(logName, null, new 
LogServiceConfiguration()))
           .toByteString()));
     } else {
       return 
CompletableFuture.completedFuture(Message.valueOf(GetLogReplyProto.newBuilder()

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/48d6a2a4/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStream.java
----------------------------------------------------------------------
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStream.java 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStream.java
index fea213e..015b90c 100644
--- 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStream.java
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStream.java
@@ -17,13 +17,13 @@
  */
 package org.apache.ratis.logservice.api;
 
-import java.io.IOException;
+import java.util.Collection;
 import java.util.Set;
 
 /**
  * A distributed log with "infinite" length that supports reads and writes.
  */
-public interface LogStream {
+public interface LogStream extends AutoCloseable{
 
   /**
    * An enumeration that defines the current state of a LogStream
@@ -70,10 +70,30 @@ public interface LogStream {
   /**
    * Returns all {@link RecordListeners} for this LogStream.
    */
-  Set<RecordListener> getRecordListeners();
+  Collection<RecordListener> getRecordListeners();
 
   /**
    * Returns a copy of the Configuration for this LogStream.
    */
-  LogStreamConfiguration getConfiguration();
+  LogServiceConfiguration getConfiguration();
+
+  /**
+   * Add new log record listener
+   * @param listener listener
+   */
+  void addRecordListener(RecordListener listener);
+
+
+  /**
+   * Remove record listener
+   * @param listener listener
+   * @return true, if successful, false - otherwise
+   */
+  boolean removeRecordListener (RecordListener listener);
+
+  /**
+   * Get log service
+   * @return log service instance
+   */
+  LogService getLogService();
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/48d6a2a4/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStreamConfiguration.java
----------------------------------------------------------------------
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStreamConfiguration.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStreamConfiguration.java
deleted file mode 100644
index 12aa030..0000000
--- 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStreamConfiguration.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.logservice.api;
-
-import java.util.Map;
-import java.util.Map.Entry;
-
-/**
- * An encapsulation of configuration for a LogStream.
- */
-public interface LogStreamConfiguration {
-
-  /**
-   * Fetches the value for the given key from the configuration. If there is 
no entry for
-   * the given key, {@code null} is returned.
-   *
-   * @param key The configuration key
-   */
-  String get(String key);
-
-  /**
-   * Sets the given key and value into this configuration. The configuration 
key may
-   * not be null. A null value removes the key from the configuration.
-   *
-   * @param key Configuration key, must be non-null
-   * @param value Configuration value
-   */
-  void set(String key, String value);
-
-  /**
-   * Removes any entry with the given key from the configuration. If there is 
no entry
-   * for the given key, this method returns without error. The provided key 
must be
-   * non-null.
-   *
-   * @param key The configuration key, must be non-null
-   */
-  void remove(String key);
-
-  /**
-   * Sets the collection of key-value pairs into the configuration. This is 
functionally
-   * equivalent to calling {@link #set(String, String)} numerous time.
-   */
-  void setMany(Iterable<Entry<String,String>> entries);
-
-  /**
-   * Returns an immutable view over the configuration as a {@code Map}.
-   */
-  Map<String,String> asMap();
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/48d6a2a4/ratis-logservice/src/main/java/org/apache/ratis/logservice/dummy/DummyLogService.java
----------------------------------------------------------------------
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/dummy/DummyLogService.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/dummy/DummyLogService.java
index 691556a..4393e70 100644
--- 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/dummy/DummyLogService.java
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/dummy/DummyLogService.java
@@ -24,11 +24,12 @@ import java.util.Iterator;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.logservice.api.LogName;
 import org.apache.ratis.logservice.api.LogService;
 import org.apache.ratis.logservice.api.LogStream;
 import org.apache.ratis.logservice.api.LogStream.State;
-import org.apache.ratis.logservice.api.LogStreamConfiguration;
+import org.apache.ratis.logservice.api.LogServiceConfiguration;
 import org.apache.ratis.logservice.api.RecordListener;
 
 public class DummyLogService implements LogService {
@@ -40,7 +41,7 @@ public class DummyLogService implements LogService {
   }
 
   @Override
-  public LogStream createLog(LogName name, LogStreamConfiguration config) {
+  public LogStream createLog(LogName name, LogServiceConfiguration config) {
     return new DummyLogStream(this, name);
   }
 
@@ -65,7 +66,7 @@ public class DummyLogService implements LogService {
 
   @Override public void deleteLog(LogName name) {}
 
-  @Override public void updateConfiguration(LogName name, 
LogStreamConfiguration config) {}
+  @Override public void updateConfiguration(LogName name, 
LogServiceConfiguration config) {}
 
   @Override public void addRecordListener(LogName name, RecordListener 
listener) {
     recordListeners.compute(name, (key, currentValue) -> {
@@ -77,16 +78,29 @@ public class DummyLogService implements LogService {
     });
   }
 
-  @Override public void removeRecordListener(LogName name, RecordListener 
listener) {
-    recordListeners.compute(name, (key, currentValue) -> {
+  @Override public boolean removeRecordListener(LogName name, RecordListener 
listener) {
+    Set<RecordListener> result = recordListeners.compute(name, (key, 
currentValue) -> {
       if (currentValue == null) {
         return null;
       }
       currentValue.remove(listener);
       return currentValue;
     });
+    return result.size() > 0;
   }
 
   @Override public void close() throws IOException {}
 
+  @Override
+  public RaftClient getRaftClient() {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public LogServiceConfiguration getConfiguration() {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/48d6a2a4/ratis-logservice/src/main/java/org/apache/ratis/logservice/dummy/DummyLogStream.java
----------------------------------------------------------------------
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/dummy/DummyLogStream.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/dummy/DummyLogStream.java
index c52a160..a931951 100644
--- 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/dummy/DummyLogStream.java
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/dummy/DummyLogStream.java
@@ -23,8 +23,9 @@ import java.util.Set;
 
 import org.apache.ratis.logservice.api.LogName;
 import org.apache.ratis.logservice.api.LogReader;
+import org.apache.ratis.logservice.api.LogService;
 import org.apache.ratis.logservice.api.LogStream;
-import org.apache.ratis.logservice.api.LogStreamConfiguration;
+import org.apache.ratis.logservice.api.LogServiceConfiguration;
 import org.apache.ratis.logservice.api.LogWriter;
 import org.apache.ratis.logservice.api.RecordListener;
 
@@ -77,7 +78,31 @@ public class DummyLogStream implements LogStream {
   }
 
   @Override
-  public LogStreamConfiguration getConfiguration() {
+  public LogServiceConfiguration getConfiguration() {
+    return null;
+  }
+
+  @Override
+  public void close() throws Exception {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public void addRecordListener(RecordListener listener) {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public boolean removeRecordListener(RecordListener listener) {
+    // TODO Auto-generated method stub
+    return false;
+  }
+
+  @Override
+  public LogService getLogService() {
+    // TODO Auto-generated method stub
     return null;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/48d6a2a4/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/BaseLogStream.java
----------------------------------------------------------------------
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/BaseLogStream.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/BaseLogStream.java
deleted file mode 100644
index 4fe830e..0000000
--- 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/BaseLogStream.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.logservice.impl;
-
-import java.util.Set;
-
-import org.apache.ratis.logservice.api.LogName;
-import org.apache.ratis.logservice.api.LogReader;
-import org.apache.ratis.logservice.api.LogStream;
-import org.apache.ratis.logservice.api.LogStreamConfiguration;
-import org.apache.ratis.logservice.api.LogWriter;
-import org.apache.ratis.logservice.api.RecordListener;
-
-public class BaseLogStream implements LogStream {
-  private LogName logName;
-  private State state;
-  private long size;
-
-  public BaseLogStream(LogName logName, State state, long size) {
-    this.logName = logName;
-    this.state = state;
-    this.size = size;
-  }
-
-  @Override
-  public LogName getName() {
-    return logName;
-  }
-
-  @Override
-  public State getState() {
-    return state;
-  }
-
-  @Override
-  public long getSize() {
-    return size;
-  }
-
-  @Override
-  public LogReader createReader() {
-    // TODO Auto-generated method stub
-    return null;
-  }
-
-  @Override
-  public LogWriter createWriter() {
-    // TODO Auto-generated method stub
-    return null;
-  }
-
-  @Override
-  public long getLastRecordId() {
-    // TODO Auto-generated method stub
-    return 0;
-  }
-
-  @Override
-  public Set<RecordListener> getRecordListeners() {
-    // TODO Auto-generated method stub
-    return null;
-  }
-
-  @Override
-  public LogStreamConfiguration getConfiguration() {
-    // TODO Auto-generated method stub
-    return null;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/48d6a2a4/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogReaderImpl.java
----------------------------------------------------------------------
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogReaderImpl.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogReaderImpl.java
new file mode 100644
index 0000000..ae4f2de
--- /dev/null
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogReaderImpl.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.logservice.impl;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.logservice.api.LogReader;
+import org.apache.ratis.logservice.api.LogServiceConfiguration;
+import org.apache.ratis.logservice.api.LogStream;
+import org.apache.ratis.logservice.util.LogServiceProtoUtil;
+import org.apache.ratis.proto.logservice.LogServiceProtos.LogServiceException;
+import org.apache.ratis.proto.logservice.LogServiceProtos.ReadLogReplyProto;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LogReaderImpl implements LogReader {
+  public static final Logger LOG = 
LoggerFactory.getLogger(LogReaderImpl.class);
+
+  /*
+   * Parent log stream
+   */
+  private LogStream parent;
+  /*
+   * Raft client
+   */
+  private RaftClient   raftClient;
+  /*
+   * Log service configuration object
+   */
+  private LogServiceConfiguration config;
+
+  /*
+   * offset
+   */
+  long currentRecordId;
+
+  public LogReaderImpl(LogStream logStream) {
+    this.parent = logStream;
+    this.raftClient = logStream.getLogService().getRaftClient();
+    this.config = logStream.getConfiguration();
+  }
+
+  @Override
+  public void seek(long recordId) throws IOException {
+    this.currentRecordId = recordId;
+  }
+
+  @Override
+  public ByteBuffer readNext() throws IOException {
+    int num = 1;
+    RaftClientReply reply =
+        raftClient.sendReadOnly(Message.valueOf(LogServiceProtoUtil
+            .toReadLogRequestProto(parent.getName(), currentRecordId, num)
+            .toByteString()));
+    ReadLogReplyProto proto = 
ReadLogReplyProto.parseFrom(reply.getMessage().getContent());
+    if (proto.hasException()) {
+      LogServiceException e = proto.getException();
+      throw new IOException(e.getErrorMsg());
+    }
+    proto.getLogRecord(0);
+    currentRecordId++;
+    return ByteBuffer.wrap(proto.getLogRecord(0).toByteArray());
+  }
+
+  @Override
+  public void readNext(ByteBuffer buffer) throws IOException {
+    int num = 1;
+    RaftClientReply reply =
+        raftClient.sendReadOnly(Message.valueOf(LogServiceProtoUtil
+            .toReadLogRequestProto(parent.getName(), currentRecordId, num)
+            .toByteString()));
+    ReadLogReplyProto proto = 
ReadLogReplyProto.parseFrom(reply.getMessage().getContent());
+    if (proto.hasException()) {
+      LogServiceException e = proto.getException();
+      throw new IOException(e.getErrorMsg());
+    }
+    currentRecordId++;
+    //TODO limits
+    buffer.put(proto.getLogRecord(0).toByteArray());
+  }
+
+  @Override
+  public List<ByteBuffer> readBulk(int numRecords) throws IOException {
+    RaftClientReply reply =
+        raftClient.sendReadOnly(Message.valueOf(LogServiceProtoUtil
+            .toReadLogRequestProto(parent.getName(), currentRecordId, 
numRecords)
+            .toByteString()));
+    ReadLogReplyProto proto = 
ReadLogReplyProto.parseFrom(reply.getMessage().getContent());
+    if (proto.hasException()) {
+      LogServiceException e = proto.getException();
+      throw new IOException(e.getErrorMsg());
+    }
+    //TODO correct current record
+    currentRecordId+= numRecords;
+    List<ByteBuffer> ret = new ArrayList<ByteBuffer>();
+    int n = proto.getLogRecordCount();
+    for(int i=0; i < n; i++) {
+      ret.add(ByteBuffer.wrap(proto.getLogRecord(i).toByteArray()));
+    }
+    return ret;
+  }
+
+  @Override
+  public int readBulk(List<ByteBuffer> buffers) throws IOException {
+    RaftClientReply reply =
+        raftClient.sendReadOnly(Message.valueOf(LogServiceProtoUtil
+            .toReadLogRequestProto(parent.getName(), currentRecordId, 
buffers.size())
+            .toByteString()));
+    ReadLogReplyProto proto = 
ReadLogReplyProto.parseFrom(reply.getMessage().getContent());
+    if (proto.hasException()) {
+      LogServiceException e = proto.getException();
+      throw new IOException(e.getErrorMsg());
+    }
+    //TODO correct current record
+    int n = proto.getLogRecordCount();
+    currentRecordId += n;
+    for(int i=0; i< n; i++) {
+      buffers.get(i).put(proto.getLogRecord(i).toByteArray());
+    }
+    return n;
+  }
+
+  @Override
+  public long getPosition() {
+    return currentRecordId;
+  }
+
+  @Override
+  public void close() throws IOException {
+    // TODO Auto-generated method stub
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/48d6a2a4/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogServiceImpl.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogServiceImpl.java
index 407e498..681a4ab 100644
--- 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogServiceImpl.java
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogServiceImpl.java
@@ -22,12 +22,11 @@ import java.util.Iterator;
 import java.util.List;
 
 import org.apache.ratis.client.RaftClient;
-import org.apache.ratis.client.impl.ClientProtoUtils;
 import org.apache.ratis.logservice.api.LogName;
 import org.apache.ratis.logservice.api.LogService;
+import org.apache.ratis.logservice.api.LogServiceConfiguration;
 import org.apache.ratis.logservice.api.LogStream;
 import org.apache.ratis.logservice.api.LogStream.State;
-import org.apache.ratis.logservice.api.LogStreamConfiguration;
 import org.apache.ratis.logservice.api.RecordListener;
 import org.apache.ratis.logservice.util.LogServiceProtoUtil;
 import org.apache.ratis.proto.logservice.LogServiceProtos.ArchiveLogReplyProto;
@@ -45,9 +44,11 @@ import org.apache.ratis.protocol.RaftClientReply;
 public class LogServiceImpl implements LogService {
 
   final private RaftClient raftClient;
+  final private LogServiceConfiguration config;
 
-  public LogServiceImpl(RaftClient raftClient) {
+  public LogServiceImpl(RaftClient raftClient, LogServiceConfiguration config) 
{
     this.raftClient = raftClient;
+    this.config = config;
   }
 
   @Override
@@ -56,32 +57,28 @@ public class LogServiceImpl implements LogService {
         
raftClient.send(Message.valueOf(LogServiceProtoUtil.toCreateLogRequestProto(name)
             .toByteString()));
     CreateLogReplyProto parseFrom = 
CreateLogReplyProto.parseFrom(reply.getMessage().getContent());
-    return LogServiceProtoUtil.toLogStream(parseFrom.getLogStream());
+    return LogServiceProtoUtil.toLogStream(parseFrom.getLogStream(), this);
   }
 
-  @Override
-  public LogStream createLog(LogName name, LogStreamConfiguration config) 
throws IOException {
-    // TODO need to make changes in the create log request to pass config.
-    return createLog(name);
-  }
+
 
   @Override
   public LogStream getLog(LogName name) throws IOException {
     RaftClientReply reply =
-        
raftClient.send(Message.valueOf(LogServiceProtoUtil.toGetLogRequestProto(name)
+        
raftClient.sendReadOnly(Message.valueOf(LogServiceProtoUtil.toGetLogRequestProto(name)
             .toByteString()));
     GetLogReplyProto parseFrom = 
GetLogReplyProto.parseFrom(reply.getMessage().getContent());
-    return LogServiceProtoUtil.toLogStream(parseFrom.getLogStream());
+    return LogServiceProtoUtil.toLogStream(parseFrom.getLogStream(), this);
   }
 
   @Override
   public Iterator<LogStream> listLogs() throws IOException {
     RaftClientReply reply =
         raftClient
-            
.send(Message.valueOf(LogServiceProtoUtil.toListLogRequestProto().toByteString()));
+            
.sendReadOnly(Message.valueOf(LogServiceProtoUtil.toListLogRequestProto().toByteString()));
     ListLogsReplyProto parseFrom = 
ListLogsReplyProto.parseFrom(reply.getMessage().getContent());
     List<LogStreamProto> logStremsList = parseFrom.getLogStremsList();
-    return LogServiceProtoUtil.toListLogStreams(logStremsList).iterator();
+    return LogServiceProtoUtil.toListLogStreams(logStremsList, 
this).iterator();
   }
 
   @Override
@@ -95,7 +92,7 @@ public class LogServiceImpl implements LogService {
   @Override
   public State getState(LogName name) throws IOException {
     RaftClientReply reply =
-        
raftClient.send(Message.valueOf(LogServiceProtoUtil.toGetStateRequestProto(name)
+        
raftClient.sendReadOnly(Message.valueOf(LogServiceProtoUtil.toGetStateRequestProto(name)
             .toByteString()));
     GetStateReplyProto parseFrom = 
GetStateReplyProto.parseFrom(reply.getMessage().getContent());
     return parseFrom.getState() == LogStreamState.OPEN ? State.OPEN : 
State.CLOSED;
@@ -118,28 +115,50 @@ public class LogServiceImpl implements LogService {
     DeleteLogReplyProto parseFrom = 
DeleteLogReplyProto.parseFrom(reply.getMessage().getContent());
   }
 
+
   @Override
-  public void updateConfiguration(LogName name, LogStreamConfiguration config) 
{
+  public void addRecordListener(LogName name, RecordListener listener) {
     // TODO Auto-generated method stub
 
   }
 
   @Override
-  public void addRecordListener(LogName name, RecordListener listener) {
+  public boolean removeRecordListener(LogName name, RecordListener listener) {
     // TODO Auto-generated method stub
-
+    return false;
   }
 
   @Override
-  public void removeRecordListener(LogName name, RecordListener listener) {
+  public void close() throws IOException {
     // TODO Auto-generated method stub
 
   }
 
   @Override
-  public void close() throws IOException {
+  public LogStream createLog(LogName name, LogServiceConfiguration config) 
throws IOException {
+    //TODO configuration
+    RaftClientReply reply =
+        
raftClient.send(Message.valueOf(LogServiceProtoUtil.toCreateLogRequestProto(name)
+            .toByteString()));
+    CreateLogReplyProto parseFrom = 
CreateLogReplyProto.parseFrom(reply.getMessage().getContent());
+    return LogServiceProtoUtil.toLogStream(parseFrom.getLogStream(), this);
+  }
+
+  @Override
+  public void updateConfiguration(LogName name, LogServiceConfiguration 
config) {
     // TODO Auto-generated method stub
 
   }
 
+
+  @Override
+  public RaftClient getRaftClient() {
+    return raftClient;
+  }
+
+  @Override
+  public LogServiceConfiguration getConfiguration() {
+    return config;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/48d6a2a4/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogStreamImpl.java
----------------------------------------------------------------------
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogStreamImpl.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogStreamImpl.java
new file mode 100644
index 0000000..06a28f5
--- /dev/null
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogStreamImpl.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.logservice.impl;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.ratis.logservice.api.LogName;
+import org.apache.ratis.logservice.api.LogReader;
+import org.apache.ratis.logservice.api.LogService;
+import org.apache.ratis.logservice.api.LogStream;
+import org.apache.ratis.logservice.api.LogServiceConfiguration;
+import org.apache.ratis.logservice.api.LogWriter;
+import org.apache.ratis.logservice.api.RecordListener;
+import org.apache.ratis.proto.logservice.LogServiceProtos.LogStreamProto;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LogStreamImpl implements LogStream {
+  public static final Logger LOG = 
LoggerFactory.getLogger(LogStreamImpl.class);
+
+  /*
+   * Log stream listeners
+   */
+  List<RecordListener> listeners;
+  /*
+   * Log stream name
+   */
+  LogName name;
+  /*
+   * Parent log service instance
+   */
+  LogService service;
+  /*
+   * Log stream configuration
+   */
+  LogServiceConfiguration config;
+  /*
+   * State
+   */
+  LogStream.State state;
+
+  /*
+   * Length
+   */
+  long length;
+
+  public LogStreamImpl(LogStreamProto proto, LogService service) {
+    this.service = service;
+    this.name = LogName.of(proto.getLogName().getName());
+    this.config = service.getConfiguration();
+    init();
+  }
+
+  public LogStreamImpl(LogName name, LogService logService) {
+    this.service = logService;
+    this.name = name;
+    this.config = this.service.getConfiguration();
+    init();
+  }
+
+  public LogStreamImpl(LogName name, LogService logService, 
LogServiceConfiguration config) {
+    this.service = logService;
+    this.name = name;
+    this.config = config;
+    init();
+  }
+
+  private void init() {
+    // TODO create new state machine. etc
+    state = State.OPEN;
+    listeners = Collections.synchronizedList(new ArrayList<RecordListener>());
+  }
+
+  @Override
+  public LogName getName() {
+    return name;
+  }
+
+  @Override
+  public State getState() {
+    return state;
+  }
+
+  @Override
+  public long getSize() {
+    // TODO use raft client to query state machine
+    return 0;
+  }
+
+  @Override
+  public LogReader createReader() {
+    return new LogReaderImpl(this);
+  }
+
+  @Override
+  public LogWriter createWriter() {
+    return new LogWriterImpl(this);
+  }
+
+  @Override
+  public long getLastRecordId() {
+    // TODO use raft client to query state machine
+    return 0;
+  }
+
+  @Override
+  public Collection<RecordListener> getRecordListeners() {
+    return listeners;
+  }
+
+  @Override
+  public LogServiceConfiguration getConfiguration() {
+    return config;
+  }
+
+  @Override
+  public void close() throws Exception {
+    // TODO Auto-generated method stub
+    state = State.CLOSED;
+  }
+
+  @Override
+  public void addRecordListener(RecordListener listener) {
+    synchronized (listeners) {
+      if (!listeners.contains(listener)) {
+        listeners.add(listener);
+      }
+    }
+  }
+
+  @Override
+  public boolean removeRecordListener(RecordListener listener) {
+    return listeners.remove(listener);
+  }
+
+  @Override
+  public LogService getLogService() {
+    return service;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/48d6a2a4/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogWriterImpl.java
----------------------------------------------------------------------
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogWriterImpl.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogWriterImpl.java
new file mode 100644
index 0000000..92082ab
--- /dev/null
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogWriterImpl.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.logservice.impl;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.logservice.api.LogServiceConfiguration;
+import org.apache.ratis.logservice.api.LogStream;
+import org.apache.ratis.logservice.api.LogWriter;
+import org.apache.ratis.logservice.util.LogServiceProtoUtil;
+import 
org.apache.ratis.proto.logservice.LogServiceProtos.AppendLogEntryReplyProto;
+import org.apache.ratis.proto.logservice.LogServiceProtos.LogServiceException;
+import org.apache.ratis.proto.logservice.LogServiceProtos.SyncLogReplyProto;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LogWriterImpl implements LogWriter {
+  public static final Logger LOG = 
LoggerFactory.getLogger(LogWriterImpl.class);
+
+  /*
+   * Parent log stream
+   */
+  private LogStream parent;
+  /*
+   * Raft client
+   */
+  private RaftClient   raftClient;
+  /*
+   * Log service configuration object
+   */
+  private LogServiceConfiguration config;
+
+  public LogWriterImpl(LogStream logStream) {
+    this.parent = logStream;
+    this.raftClient = logStream.getLogService().getRaftClient();
+    this.config = logStream.getConfiguration();
+  }
+
+  @Override
+  public long write(ByteBuffer data) throws IOException {
+    List<ByteBuffer> list = new ArrayList<ByteBuffer>();
+    list.add(data);
+    RaftClientReply reply =
+        raftClient.send(Message.valueOf(LogServiceProtoUtil
+            .toAppendBBEntryLogRequestProto(parent.getName(), list)
+            .toByteString()));
+    AppendLogEntryReplyProto proto = 
AppendLogEntryReplyProto.parseFrom(reply.getMessage().getContent());
+    if (proto.hasException()) {
+      LogServiceException e = proto.getException();
+      throw new IOException(e.getErrorMsg());
+    }
+    //TODO current record id
+    return 0;
+  }
+
+  @Override
+  public long sync() throws IOException {
+    RaftClientReply reply =
+        raftClient.send(Message.valueOf(LogServiceProtoUtil
+            .toSyncLogRequestProto(parent.getName())
+            .toByteString()));
+    SyncLogReplyProto proto = 
SyncLogReplyProto.parseFrom(reply.getMessage().getContent());
+    if (proto.hasException()) {
+      LogServiceException e = proto.getException();
+      throw new IOException(e.getErrorMsg());
+    }
+    //TODO current record id
+    return 0;
+  }
+
+  @Override
+  public void close() throws IOException {
+    //TODO
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/48d6a2a4/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceProtoUtil.java
----------------------------------------------------------------------
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceProtoUtil.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceProtoUtil.java
index 998dc17..2e1b8da 100644
--- 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceProtoUtil.java
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceProtoUtil.java
@@ -1,39 +1,63 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
- * agreements. See the NOTICE file distributed with this work for additional 
information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache 
License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the 
License. You may obtain a
- * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless 
required by applicable
- * law or agreed to in writing, software distributed under the License is 
distributed on an "AS IS"
- * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied. See the License
- * for the specific language governing permissions and limitations under the 
License.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.ratis.logservice.util;
 
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.ratis.logservice.api.LogMessage;
 import org.apache.ratis.logservice.api.LogName;
+import org.apache.ratis.logservice.api.LogService;
 import org.apache.ratis.logservice.api.LogStream;
 import org.apache.ratis.logservice.api.LogStream.State;
-import org.apache.ratis.logservice.impl.BaseLogStream;
+import org.apache.ratis.logservice.impl.LogStreamImpl;
+import 
org.apache.ratis.proto.logservice.LogServiceProtos.AppendLogEntryReplyProto;
+import 
org.apache.ratis.proto.logservice.LogServiceProtos.AppendLogEntryRequestProto;
+import org.apache.ratis.proto.logservice.LogServiceProtos.ArchiveLogReplyProto;
 import 
org.apache.ratis.proto.logservice.LogServiceProtos.ArchiveLogRequestProto;
+import org.apache.ratis.proto.logservice.LogServiceProtos.CloseLogReplyProto;
 import org.apache.ratis.proto.logservice.LogServiceProtos.CloseLogRequestProto;
 import org.apache.ratis.proto.logservice.LogServiceProtos.CreateLogReplyProto;
 import 
org.apache.ratis.proto.logservice.LogServiceProtos.CreateLogRequestProto;
+import org.apache.ratis.proto.logservice.LogServiceProtos.DeleteLogReplyProto;
 import 
org.apache.ratis.proto.logservice.LogServiceProtos.DeleteLogRequestProto;
+import 
org.apache.ratis.proto.logservice.LogServiceProtos.GetLogLengthReplyProto;
+import 
org.apache.ratis.proto.logservice.LogServiceProtos.GetLogLengthRequestProto;
 import org.apache.ratis.proto.logservice.LogServiceProtos.GetLogReplyProto;
 import org.apache.ratis.proto.logservice.LogServiceProtos.GetLogRequestProto;
+import 
org.apache.ratis.proto.logservice.LogServiceProtos.GetLogStartIndexReplyProto;
+import 
org.apache.ratis.proto.logservice.LogServiceProtos.GetLogStartIndexRequestProto;
 import org.apache.ratis.proto.logservice.LogServiceProtos.GetStateReplyProto;
 import org.apache.ratis.proto.logservice.LogServiceProtos.GetStateRequestProto;
 import org.apache.ratis.proto.logservice.LogServiceProtos.ListLogsReplyProto;
 import 
org.apache.ratis.proto.logservice.LogServiceProtos.ListLogsReplyProto.Builder;
 import org.apache.ratis.proto.logservice.LogServiceProtos.ListLogsRequestProto;
 import org.apache.ratis.proto.logservice.LogServiceProtos.LogNameProto;
+import org.apache.ratis.proto.logservice.LogServiceProtos.LogServiceException;
 import 
org.apache.ratis.proto.logservice.LogServiceProtos.LogServiceRequestProto;
 import org.apache.ratis.proto.logservice.LogServiceProtos.LogStreamProto;
 import org.apache.ratis.proto.logservice.LogServiceProtos.LogStreamState;
+import org.apache.ratis.proto.logservice.LogServiceProtos.ReadLogReplyProto;
+import org.apache.ratis.proto.logservice.LogServiceProtos.ReadLogRequestProto;
+import org.apache.ratis.proto.logservice.LogServiceProtos.SyncLogReplyProto;
+import org.apache.ratis.proto.logservice.LogServiceProtos.SyncLogRequestProto;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 
 /**
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -78,6 +102,11 @@ public class LogServiceProtoUtil {
     return LogServiceRequestProto.newBuilder().setCloseLog(closeLog).build();
   }
 
+  public static CloseLogReplyProto toCloseLogReplyProto() {
+    CloseLogReplyProto.Builder builder = CloseLogReplyProto.newBuilder();
+    return builder.build();
+  }
+
   public static LogServiceRequestProto toGetStateRequestProto(LogName logName) 
{
     LogNameProto logNameProto = 
LogNameProto.newBuilder().setName(logName.getName()).build();
     GetStateRequestProto getState =
@@ -99,15 +128,9 @@ public class LogServiceProtoUtil {
     return LogServiceRequestProto.newBuilder().setDeleteLog(deleteLog).build();
   }
 
-  public static LogMessage toLogMessage(
-      org.apache.ratis.proto.logservice.LogServiceProtos.LogMessage message) {
-    if (!message.getData().isEmpty()) {
-      return new LogMessage(LogName.of(message.getLogName()), 
message.getData().toByteArray());
-    } else if (message.getLength() != 0) {
-      return new LogMessage(LogName.of(message.getLogName()), 
message.getLength());
-    } else {
-      return new LogMessage(LogName.of(message.getLogName()));
-    }
+  public static DeleteLogReplyProto toDeleteLogReplyProto() {
+    DeleteLogReplyProto.Builder builder = DeleteLogReplyProto.newBuilder();
+    return builder.build();
   }
 
   public static LogNameProto toLogNameProto(LogName logName) {
@@ -132,10 +155,8 @@ public class LogServiceProtoUtil {
     return logStreamProto;
   }
 
-  public static LogStream toLogStream(LogStreamProto logStream) {
-    return new BaseLogStream(toLogName(logStream.getLogName()),
-        (logStream.getState() == LogStreamState.OPEN ? State.OPEN : 
State.CLOSED),
-        logStream.getSize());
+  public static LogStream toLogStream(LogStreamProto logStream, LogService 
parent) {
+    return new LogStreamImpl(logStream, parent);
   }
 
   public static CreateLogReplyProto toCreateLogReplyProto(LogStream logStream) 
{
@@ -160,14 +181,86 @@ public class LogServiceProtoUtil {
     return newBuilder.build();
   }
 
-  public static List<LogStream> toListLogStreams(List<LogStreamProto> 
logStreamProtos) {
+  public static ArchiveLogReplyProto toArchiveLogReplyProto() {
+    ArchiveLogReplyProto.Builder builder = ArchiveLogReplyProto.newBuilder();
+    return builder.build();
+  }
+
+  public static LogServiceRequestProto toGetLengthRequestProto(LogName name) {
+    LogNameProto logNameProto =
+        LogNameProto.newBuilder().setName(name.getName()).build();
+    GetLogLengthRequestProto.Builder builder = 
GetLogLengthRequestProto.newBuilder();
+    builder.setLogName(logNameProto);
+    return 
LogServiceRequestProto.newBuilder().setLengthQuery(builder.build()).build();
+  }
+
+  public static LogServiceRequestProto toGetStartIndexProto(LogName name) {
+    LogNameProto logNameProto =
+        LogNameProto.newBuilder().setName(name.getName()).build();
+    GetLogStartIndexRequestProto.Builder builder = 
GetLogStartIndexRequestProto.newBuilder();
+    builder.setLogName(logNameProto);
+    return 
LogServiceRequestProto.newBuilder().setStartIndexQuery(builder.build()).build();
+  }
+
+  public static LogServiceRequestProto toReadLogRequestProto(LogName name, 
long start, int total) {
+    LogNameProto logNameProto =
+        LogNameProto.newBuilder().setName(name.getName()).build();
+    ReadLogRequestProto.Builder builder = ReadLogRequestProto.newBuilder();
+    builder.setLogName(logNameProto);
+    builder.setStartRecordId(start);
+    builder.setNumRecords(total);
+    return 
LogServiceRequestProto.newBuilder().setReadNextQuery(builder.build()).build();
+  }
+
+  public static LogServiceRequestProto toSyncLogRequestProto(LogName name) {
+    LogNameProto logNameProto =
+        LogNameProto.newBuilder().setName(name.getName()).build();
+    SyncLogRequestProto.Builder builder = SyncLogRequestProto.newBuilder();
+    builder.setLogName(logNameProto);
+    return 
LogServiceRequestProto.newBuilder().setSyncRequest(builder.build()).build();
+  }
+
+  public static LogServiceRequestProto toAppendEntryLogRequestProto(LogName 
name,
+      List<byte[]> entries) {
+    LogNameProto logNameProto =
+        LogNameProto.newBuilder().setName(name.getName()).build();
+    AppendLogEntryRequestProto.Builder builder = 
AppendLogEntryRequestProto.newBuilder();
+    builder.setLogName(logNameProto);
+    for (int i=0; i < entries.size(); i++) {
+      builder.addData(ByteString.copyFrom(entries.get(i)));
+    }
+    return 
LogServiceRequestProto.newBuilder().setAppendRequest(builder.build()).build();
+  }
+
+  public static LogServiceRequestProto toAppendBBEntryLogRequestProto(LogName 
name,
+      List<ByteBuffer> entries) {
+    LogNameProto logNameProto =
+        LogNameProto.newBuilder().setName(name.getName()).build();
+    AppendLogEntryRequestProto.Builder builder = 
AppendLogEntryRequestProto.newBuilder();
+    builder.setLogName(logNameProto);
+    for (int i=0; i < entries.size(); i++) {
+      builder.addData(ByteString.copyFrom(entries.get(i)));
+    }
+    return 
LogServiceRequestProto.newBuilder().setAppendRequest(builder.build()).build();
+  }
+
+  public static List<LogStream> toListLogStreams(List<LogStreamProto> 
logStreamProtos,
+      LogService parent) {
     List<LogStream> logStreams = new ArrayList<>(logStreamProtos.size());
     for (LogStreamProto proto : logStreamProtos) {
-      logStreams.add(toLogStream(proto));
+      logStreams.add(toLogStream(proto, parent));
     }
     return logStreams;
   }
 
+  public static List<byte[]> toListByteArray(List<ByteString> list) {
+    List<byte[]> retVal = new ArrayList<byte[]>(list.size());
+    for(int i=0; i < list.size(); i++) {
+      retVal.add(list.get(i).toByteArray());
+    }
+    return retVal;
+  }
+
   public static GetLogReplyProto toGetLogReplyProto(LogStream logStream) {
     return 
GetLogReplyProto.newBuilder().setLogStream(toLogStreamProto(logStream)).build();
   }
@@ -176,4 +269,79 @@ public class LogServiceProtoUtil {
     return GetStateReplyProto.newBuilder()
         .setState(exists ? LogStreamState.OPEN : 
LogStreamState.CLOSED).build();
   }
+
+
+  public static GetLogLengthReplyProto toGetLogLengthReplyProto(long length, 
Throwable t) {
+    GetLogLengthReplyProto.Builder builder = 
GetLogLengthReplyProto.newBuilder();
+    if (t != null) {
+      builder.setException(toLogException(t));
+    } else {
+      builder.setLength(length);
+    }
+    return builder.build();
+  }
+
+  public static GetLogStartIndexReplyProto toGetLogStartIndexReplyProto(long 
length, Throwable t) {
+    GetLogStartIndexReplyProto.Builder builder = 
GetLogStartIndexReplyProto.newBuilder();
+    if (t != null) {
+      builder.setException(toLogException(t));
+    } else {
+      builder.setStartIndex(length);
+    }
+    return builder.build();
+  }
+
+  public static ReadLogReplyProto toReadLogReplyProto(List<byte[]> entries, 
Throwable t) {
+    ReadLogReplyProto.Builder builder = ReadLogReplyProto.newBuilder();
+    if (t != null) {
+      builder.setException(toLogException(t));
+    } else {
+      for(byte[] record: entries) {
+        builder.addLogRecord( ByteString.copyFrom(record));
+      }
+    }
+    return builder.build();
+  }
+
+  public static AppendLogEntryReplyProto toAppendLogReplyProto(List<Long> ids, 
Throwable t) {
+    AppendLogEntryReplyProto.Builder builder = 
AppendLogEntryReplyProto.newBuilder();
+    if (t!= null) {
+      builder.setException(toLogException(t));
+    } else if (ids != null){
+      int index = 0;
+      for(long id: ids) {
+        builder.setRecordId(index++, id);
+      }
+    }
+    return builder.build();
+  }
+
+  public static SyncLogReplyProto toSyncLogReplyProto(Throwable t) {
+    SyncLogReplyProto.Builder builder = SyncLogReplyProto.newBuilder();
+    if (t != null) {
+      builder.setException(toLogException(t));
+    }
+    return builder.build();
+  }
+
+  public GetLogLengthReplyProto toGetLogLengthReplyProto(long length) {
+    GetLogLengthReplyProto.Builder builder = 
GetLogLengthReplyProto.newBuilder();
+    builder.setLength(length);
+    return builder.build();
+  }
+
+  public static LogServiceException toLogException(Throwable t) {
+    LogServiceException.Builder builder = LogServiceException.newBuilder();
+    builder.setExceptionClassName(t.getClass().getName());
+    builder.setErrorMsg(t.getMessage());
+    StackTraceElement[] trace = t.getStackTrace();
+    StringBuffer buf = new StringBuffer();
+    for (StackTraceElement el: trace) {
+      buf.append(el.toString()).append("\n");
+    }
+    String strace = buf.toString();
+    builder.setStacktrace(ByteString.copyFrom(strace, 
Charset.defaultCharset()));
+    return builder.build();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/48d6a2a4/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/Utils.java
----------------------------------------------------------------------
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/Utils.java 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/Utils.java
deleted file mode 100644
index 3709b6f..0000000
--- a/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/Utils.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.logservice.util;
-
-public class Utils {
-
-
-  public static int long2bytes(long v, byte[] buf, int offset) {
-    int2bytes((int)(v >>> 32), buf, offset);
-    int2bytes((int) v        , buf, offset + 4);
-    return 8;
-  }
-
-  public static int bytes2int(byte[] buf, int offset) {
-    return (buf[offset] << 24)
-        + ((0xFF & buf[offset + 1]) << 16)
-        + ((0xFF & buf[offset + 2]) <<  8)
-        +  (0xFF & buf[offset + 3]);
-  }
-
-  public static long bytes2long(byte[] buf, int offset) {
-    return ((long)bytes2int(buf, offset) << 32)
-        + (0xFFFFFFFFL & bytes2int(buf, offset + 4));
-  }
-
-  public static int int2bytes(int v, byte[] buf, int offset) {
-    buf[offset    ] = (byte) (v >>> 24);
-    buf[offset + 1] = (byte) (v >>> 16);
-    buf[offset + 2] = (byte) (v >>> 8);
-    buf[offset + 3] = (byte) (v);
-    return 4;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/48d6a2a4/ratis-logservice/src/test/java/org/apache/ratis/logservice/LogServiceBaseTest.java
----------------------------------------------------------------------
diff --git 
a/ratis-logservice/src/test/java/org/apache/ratis/logservice/LogServiceBaseTest.java
 
b/ratis-logservice/src/test/java/org/apache/ratis/logservice/LogServiceBaseTest.java
index ecdc905..8255351 100644
--- 
a/ratis-logservice/src/test/java/org/apache/ratis/logservice/LogServiceBaseTest.java
+++ 
b/ratis-logservice/src/test/java/org/apache/ratis/logservice/LogServiceBaseTest.java
@@ -28,6 +28,7 @@ import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.logservice.api.LogName;
 import org.apache.ratis.logservice.api.LogService;
+import org.apache.ratis.logservice.api.LogServiceConfiguration;
 import org.apache.ratis.logservice.api.LogStateMachine;
 import org.apache.ratis.logservice.api.LogStream;
 import org.apache.ratis.logservice.api.LogStream.State;
@@ -47,6 +48,7 @@ public abstract class LogServiceBaseTest<CLUSTER extends 
MiniRaftCluster>
     final RaftProperties p = getProperties();
     p.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
         LogStateMachine.class, StateMachine.class);
+    LOG.info("Set LogStateMachine OK");
   }
 
   static final int NUM_PEERS = 3;
@@ -64,7 +66,8 @@ public abstract class LogServiceBaseTest<CLUSTER extends 
MiniRaftCluster>
     RaftClient raftClient =
         
RaftClient.newBuilder().setProperties(getProperties()).setRaftGroup(cluster.getGroup())
             .build();
-    LogService logService = 
LogServiceFactory.getInstance().createLogService(raftClient);
+    LogService logService = 
LogServiceFactory.getInstance().createLogService(raftClient,
+      new LogServiceConfiguration());
     LogName logName = LogName.of("log1");
     LogStream logStream = logService.createLog(logName);
     assertEquals("log1", logStream.getName().getName());
@@ -81,7 +84,7 @@ public abstract class LogServiceBaseTest<CLUSTER extends 
MiniRaftCluster>
     State state = logService.getState(logName);
     assertEquals(State.OPEN, state);
   }
-  
+
   @After
   public void tearDown() {
     cluster.shutdown();

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/48d6a2a4/ratis-logservice/src/test/java/org/apache/ratis/logservice/api/TestApiExample.java
----------------------------------------------------------------------
diff --git 
a/ratis-logservice/src/test/java/org/apache/ratis/logservice/api/TestApiExample.java
 
b/ratis-logservice/src/test/java/org/apache/ratis/logservice/api/TestApiExample.java
index 9808b23..a0b38f6 100644
--- 
a/ratis-logservice/src/test/java/org/apache/ratis/logservice/api/TestApiExample.java
+++ 
b/ratis-logservice/src/test/java/org/apache/ratis/logservice/api/TestApiExample.java
@@ -19,12 +19,10 @@ package org.apache.ratis.logservice.api;
 
 import static org.junit.Assert.assertEquals;
 
-import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.ExecutionException;
 
 import org.apache.ratis.logservice.dummy.DummyLogService;
 import org.junit.Test;
@@ -39,7 +37,7 @@ public class TestApiExample {
   }
 
   @Test
-  public void test() throws IOException, InterruptedException, 
ExecutionException {
+  public void test() throws Exception {
     try (LogService svc = new DummyLogService()) {
       LogStream log1 = svc.createLog(LogName.of("log1"));
       // Write some data

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/48d6a2a4/ratis-logservice/src/test/java/org/apache/ratis/logservice/api/TestLogMessage.java
----------------------------------------------------------------------
diff --git 
a/ratis-logservice/src/test/java/org/apache/ratis/logservice/api/TestLogMessage.java
 
b/ratis-logservice/src/test/java/org/apache/ratis/logservice/api/TestLogMessage.java
deleted file mode 100644
index d58bdda..0000000
--- 
a/ratis-logservice/src/test/java/org/apache/ratis/logservice/api/TestLogMessage.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.logservice.api;
-
-import static org.junit.Assert.assertEquals;
-
-import org.apache.ratis.logservice.api.LogMessage.Type;
-import org.junit.Test;
-
-import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
-import 
org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
-
-public class TestLogMessage {
-
-  @Test
-  public void testLogMessages() throws InvalidProtocolBufferException {
-
-    LogMessage msg = new LogMessage(LogName.of("testLog"));
-    assertEquals(Type.READ_REQUEST, msg.getType());
-    ByteString content = msg.getContent();
-    LogMessage other = LogMessage.parseFrom(content);
-    assertEquals(msg.toString(), other.toString());
-
-    msg = new LogMessage(LogName.of("testLog"), 100);
-    assertEquals(Type.READ_REPLY, msg.getType());
-    content = msg.getContent();
-    other = LogMessage.parseFrom(content);
-    assertEquals(msg.toString(), other.toString());
-
-    msg = new LogMessage(LogName.of("testLog"), new byte[] { 0, 0, 0 });
-    assertEquals(Type.WRITE, msg.getType());
-    content = msg.getContent();
-    other = LogMessage.parseFrom(content);
-    assertEquals(msg.toString(), other.toString());
-    assertEquals(msg.getData().length, other.getData().length);
-
-  }
-
-
-}

Reply via email to