Repository: incubator-ratis
Updated Branches:
  refs/heads/master 737db6543 -> 59de6bd6b


RATIS-279 Create administrative API for LogService

Signed-off-by: Josh Elser <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/59de6bd6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/59de6bd6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/59de6bd6

Branch: refs/heads/master
Commit: 59de6bd6ba92fa24c7efdce0feffb814693c268e
Parents: 737db65
Author: Rajeshbabu Chintaguntla <[email protected]>
Authored: Tue Oct 9 23:51:12 2018 -0400
Committer: Josh Elser <[email protected]>
Committed: Tue Oct 9 23:51:12 2018 -0400

----------------------------------------------------------------------
 .../ratis/logservice/LogServiceFactory.java     |   4 +-
 .../apache/ratis/logservice/api/LogName.java    |  12 +-
 .../apache/ratis/logservice/api/LogService.java |  16 +-
 .../ratis/logservice/api/LogStateMachine.java   | 161 ++++++++++++++---
 .../ratis/logservice/impl/BaseLogStream.java    |  85 +++++++++
 .../ratis/logservice/impl/LogServiceImpl.java   | 145 +++++++++++++++
 .../logservice/util/LogServiceProtoUtil.java    | 179 +++++++++++++++++++
 .../ratis/logservice/LogServiceBaseTest.java    |  89 +++++++++
 .../logservice/TestLogServiceWithGrpc.java      |  24 +++
 .../logservice/TestLogServiceWithNetty.java     |  25 +++
 ratis-proto/src/main/proto/Logservice.proto     |  80 +++++++++
 11 files changed, 787 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/59de6bd6/ratis-logservice/src/main/java/org/apache/ratis/logservice/LogServiceFactory.java
----------------------------------------------------------------------
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/LogServiceFactory.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/LogServiceFactory.java
index 4ef801e..084188e 100644
--- 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/LogServiceFactory.java
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/LogServiceFactory.java
@@ -19,6 +19,7 @@ package org.apache.ratis.logservice;
 
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.logservice.api.LogService;
+import org.apache.ratis.logservice.impl.LogServiceImpl;
 
 public class LogServiceFactory {
   private static final LogServiceFactory INSTANCE = new LogServiceFactory();
@@ -31,8 +32,7 @@ public class LogServiceFactory {
    * @param raftClient The client to a Raft quorum.
    */
   public LogService createLogService(RaftClient raftClient) {
-    //TODO return new LogServiceImpl();
-    return null;
+    return new LogServiceImpl(raftClient);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/59de6bd6/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogName.java
----------------------------------------------------------------------
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogName.java 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogName.java
index bbeb0a5..3405340 100644
--- 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogName.java
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogName.java
@@ -21,6 +21,10 @@ import static java.util.Objects.requireNonNull;
 
 import java.util.Objects;
 
+import org.apache.ratis.proto.logservice.LogServiceProtos.LogNameProto;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import 
org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
+
 /**
  * Identifier to uniquely identify a {@link LogStream}.
  */
@@ -42,7 +46,7 @@ public class LogName {
    * of identifying one LogStream/LogName from another. Users need only know 
how to construct a {@link LogName}
    * and then use that in their application.
    */
-  String getName() {
+  public String getName() {
     return name;
   }
 
@@ -76,4 +80,10 @@ public class LogName {
     // TODO Limit allowed characters in the name?
     return new LogName(name);
   }
+
+  public static LogName parseFrom(ByteString logName)
+      throws InvalidProtocolBufferException {
+    LogNameProto logNameProto = LogNameProto.parseFrom(logName);
+    return new LogName(logNameProto.getName());
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/59de6bd6/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogService.java
----------------------------------------------------------------------
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogService.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogService.java
index 860cb33..c78ed2a 100644
--- 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogService.java
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogService.java
@@ -37,7 +37,7 @@ public interface LogService extends AutoCloseable {
    *
    * @param name Unique name for this LogStream.
    */
-  LogStream createLog(LogName name);
+  LogStream createLog(LogName name) throws IOException;
 
   /**
    * Creates a new {@link LogStream} identified by the given name. Throws
@@ -46,7 +46,7 @@ public interface LogService extends AutoCloseable {
    * @param name Unique name for this LogStream.
    * @param config Configuration object for this LogStream
    */
-  LogStream createLog(LogName name, LogStreamConfiguration config);
+  LogStream createLog(LogName name, LogStreamConfiguration config) throws 
IOException;
 
   /*
    * How to get LogStreams that already exist
@@ -56,12 +56,12 @@ public interface LogService extends AutoCloseable {
    *
    * @param name The name of the LogStream
    */
-  LogStream getLog(LogName name);
+  LogStream getLog(LogName name) throws IOException;
 
   /**
    * Lists all {@link LogStream} instances known by this LogService.
    */
-  Iterator<LogStream> listLogs();
+  Iterator<LogStream> listLogs() throws IOException;
 
   /*
    * How to close, archive, and delete LogStreams
@@ -74,14 +74,14 @@ public interface LogService extends AutoCloseable {
    * @param name The name of the log to close
    */
   // TODO this name sucks, confusion WRT the Java Closeable interface.
-  void closeLog(LogName name);
+  void closeLog(LogName name) throws IOException;
 
   /**
    * Returns the current {@link State} of the log identified by {@code name}.
    *
    * @param name The name of a log
    */
-  State getState(LogName name);
+  State getState(LogName name) throws IOException;
 
   /**
    * Archives the given log out of the state machine and into a configurable 
long-term storage. A log must be
@@ -89,13 +89,13 @@ public interface LogService extends AutoCloseable {
    *
    * @param name The name of the log to archive.
    */
-  void archiveLog(LogName name);
+  void archiveLog(LogName name) throws IOException;
 
   /**
    * Deletes the {@link LogStream}.
    * @param name The name of the LogStream
    */
-  void deleteLog(LogName name);
+  void deleteLog(LogName name) throws IOException;
 
   /*
    * Change the configuration of a LogStream or manipulate a LogStream's 
listeners

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/59de6bd6/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStateMachine.java
----------------------------------------------------------------------
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStateMachine.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStateMachine.java
index cdc17f3..84689be 100644
--- 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStateMachine.java
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStateMachine.java
@@ -17,6 +17,9 @@
  */
 package org.apache.ratis.logservice.api;
 
+import org.apache.ratis.logservice.api.LogStream.State;
+import org.apache.ratis.logservice.impl.BaseLogStream;
+import org.apache.ratis.logservice.util.LogServiceProtoUtil;
 import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.server.RaftServer;
@@ -25,18 +28,31 @@ import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.storage.RaftStorage;
 import org.apache.ratis.proto.RaftProtos;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.proto.logservice.LogServiceProtos.ArchiveLogReplyProto;
+import 
org.apache.ratis.proto.logservice.LogServiceProtos.ArchiveLogRequestProto;
+import org.apache.ratis.proto.logservice.LogServiceProtos.CloseLogReplyProto;
+import org.apache.ratis.proto.logservice.LogServiceProtos.CloseLogRequestProto;
+import 
org.apache.ratis.proto.logservice.LogServiceProtos.CreateLogRequestProto;
+import org.apache.ratis.proto.logservice.LogServiceProtos.DeleteLogReplyProto;
+import 
org.apache.ratis.proto.logservice.LogServiceProtos.DeleteLogRequestProto;
+import org.apache.ratis.proto.logservice.LogServiceProtos.GetLogReplyProto;
+import org.apache.ratis.proto.logservice.LogServiceProtos.GetLogRequestProto;
+import org.apache.ratis.proto.logservice.LogServiceProtos.GetStateRequestProto;
+import 
org.apache.ratis.proto.logservice.LogServiceProtos.LogServiceRequestProto;
 import org.apache.ratis.statemachine.StateMachineStorage;
 import org.apache.ratis.statemachine.TransactionContext;
 import org.apache.ratis.statemachine.impl.BaseStateMachine;
 import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
 import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo;
 import org.apache.ratis.util.AutoCloseableLock;
-
 import 
org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
 
 import java.io.*;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -166,32 +182,133 @@ public class LogStateMachine extends BaseStateMachine {
   public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
     try {
       final LogEntryProto entry = trx.getLogEntry();
-      final LogMessage logMessage = 
LogMessage.parseFrom((entry.getSmLogEntry().getData()));
-
-      final long index = entry.getIndex();
-      Long val = null;
-      LogName name = null;
-      try (final AutoCloseableLock writeLock = writeLock()) {
-        name = logMessage.getLogName();
-        long dataLength = logMessage.getData().length;
-        val = state.get(name);
-        if (val == null) {
-          val = new Long(0);
+      LogServiceRequestProto logServiceRequestProto =
+          LogServiceRequestProto.parseFrom(entry.getSmLogEntry().getData());
+      CompletableFuture<Message> f = null;
+      switch (logServiceRequestProto.getRequestCase()) {
+       case LOGMESSAGE:
+        org.apache.ratis.proto.logservice.LogServiceProtos.LogMessage 
logMessage2 = logServiceRequestProto.getLogMessage();
+        final LogMessage logMessage = 
LogMessage.parseFrom((entry.getSmLogEntry().getData()));
+
+        final long index = entry.getIndex();
+        Long val = null;
+        LogName name = null;
+        try (final AutoCloseableLock writeLock = writeLock()) {
+          name = logMessage.getLogName();
+          long dataLength = logMessage.getData().length;
+          val = state.get(name);
+          if (val == null) {
+            val = new Long(0);
+          }
+          state.put(name, val + dataLength);
+          updateLastAppliedTermIndex(entry.getTerm(), index);
         }
-        state.put(name, val + dataLength);
-        updateLastAppliedTermIndex(entry.getTerm(), index);
-      }
-      final CompletableFuture<Message> f =
-          CompletableFuture.completedFuture(new LogMessage(name, val));
-      final RaftProtos.RaftPeerRole role = trx.getServerRole();
-      LOG.debug("{}:{}-{}: {} new length {}", role, getId(), index, 
logMessage, val);
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("{}-{}: variables={}", getId(), index, state);
+        f =
+            CompletableFuture.completedFuture(new LogMessage(name, val));
+        final RaftProtos.RaftPeerRole role = trx.getServerRole();
+        LOG.debug("{}:{}-{}: {} new length {}", role, getId(), index, 
logMessage, val);
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("{}-{}: variables={}", getId(), index, state);
+        }
+        return f;
+      case CREATELOG:
+        return processCreateLogRequest(logServiceRequestProto);
+      case LISTLOGS:
+        return processListLogsRequest();
+      case GETLOG:
+        return processGetLogRequest(logServiceRequestProto);
+      case GETSTATE:
+        return processGetStateRequest(logServiceRequestProto);
+      case ARCHIVELOG:
+        return processArchiveLog(logServiceRequestProto);
+      case CLOSELOG:
+        return processCloseLog(logServiceRequestProto);
+      case DELETELOG:
+        return processDeleteLog(logServiceRequestProto);
+      default:
+        return null;
       }
-      return f;
     } catch (InvalidProtocolBufferException e) {
       // TODO exception handling
       throw new RuntimeException(e);
     }
   }
+
+  private CompletableFuture<Message>
+      processDeleteLog(LogServiceRequestProto logServiceRequestProto) {
+    DeleteLogRequestProto deleteLog = logServiceRequestProto.getDeleteLog();
+    LogName logName = LogServiceProtoUtil.toLogName(deleteLog.getLogName());
+    try (final AutoCloseableLock writeLock = writeLock()) {
+      state.remove(logName);
+    }
+    // TODO need to handle exceptions while operating with files.
+    return CompletableFuture.completedFuture(Message
+      .valueOf(DeleteLogReplyProto.newBuilder().build().toByteString()));
+  }
+
+  private CompletableFuture<Message> processCloseLog(LogServiceRequestProto 
logServiceRequestProto) {
+    CloseLogRequestProto closeLog = logServiceRequestProto.getCloseLog();
+    LogName logName = LogServiceProtoUtil.toLogName(closeLog.getLogName());
+    // Need to check whether the file is opened if opened close it.
+    // TODO need to handle exceptions while operating with files.
+    return CompletableFuture.completedFuture(Message
+      .valueOf(CloseLogReplyProto.newBuilder().build().toByteString()));
+  }
+
+  private CompletableFuture<Message>
+      processArchiveLog(LogServiceRequestProto logServiceRequestProto) {
+    ArchiveLogRequestProto archiveLog = logServiceRequestProto.getArchiveLog();
+    LogName logName = LogServiceProtoUtil.toLogName(archiveLog.getLogName());
+    // Handle log archiving.
+    return CompletableFuture.completedFuture(Message
+      .valueOf(ArchiveLogReplyProto.newBuilder().build().toByteString()));
+  }
+
+  private CompletableFuture<Message> processGetStateRequest(
+      LogServiceRequestProto logServiceRequestProto) {
+    GetStateRequestProto getState = logServiceRequestProto.getGetState();
+    LogName logName = LogServiceProtoUtil.toLogName(getState.getLogName());
+    return 
CompletableFuture.completedFuture(Message.valueOf(LogServiceProtoUtil
+        .toGetStateReplyProto(state.containsKey(logName)).toByteString()));
+  }
+
+  private CompletableFuture<Message> processCreateLogRequest(
+      LogServiceRequestProto logServiceRequestProto) {
+    Long val;
+    LogName name;
+    try (final AutoCloseableLock writeLock = writeLock()) {
+      CreateLogRequestProto createLog = logServiceRequestProto.getCreateLog();
+      name = LogServiceProtoUtil.toLogName(createLog.getLogName());
+      val = state.get(name);
+      if (val == null) {
+        val = new Long(0);
+      }
+      state.put(name, val);
+    }
+    return 
CompletableFuture.completedFuture(Message.valueOf(LogServiceProtoUtil
+        .toCreateLogReplyProto(new BaseLogStream(name, State.OPEN, 
val)).toByteString()));
+  }
+
+  private CompletableFuture<Message> processListLogsRequest() {
+    List<LogStream> logStreams = new ArrayList<LogStream>(state.size());
+    for (Entry<LogName, Long> e : state.entrySet()) {
+      logStreams.add(new BaseLogStream(e.getKey(), State.OPEN, e.getValue()));
+    }
+    return 
CompletableFuture.completedFuture(Message.valueOf(LogServiceProtoUtil
+        .toListLogLogsReplyProto(logStreams).toByteString()));
+  }
+
+  private CompletableFuture<Message> processGetLogRequest(
+      LogServiceRequestProto logServiceRequestProto) {
+    GetLogRequestProto getLog = logServiceRequestProto.getGetLog();
+    LogName logName = LogServiceProtoUtil.toLogName(getLog.getLogName());
+    if (state.containsKey(logName)) {
+      return 
CompletableFuture.completedFuture(Message.valueOf(LogServiceProtoUtil
+          .toGetLogReplyProto(new BaseLogStream(logName, State.OPEN, 
state.get(logName)))
+          .toByteString()));
+    } else {
+      return 
CompletableFuture.completedFuture(Message.valueOf(GetLogReplyProto.newBuilder()
+          .build().toByteString()));
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/59de6bd6/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/BaseLogStream.java
----------------------------------------------------------------------
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/BaseLogStream.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/BaseLogStream.java
new file mode 100644
index 0000000..4fe830e
--- /dev/null
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/BaseLogStream.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.logservice.impl;
+
+import java.util.Set;
+
+import org.apache.ratis.logservice.api.LogName;
+import org.apache.ratis.logservice.api.LogReader;
+import org.apache.ratis.logservice.api.LogStream;
+import org.apache.ratis.logservice.api.LogStreamConfiguration;
+import org.apache.ratis.logservice.api.LogWriter;
+import org.apache.ratis.logservice.api.RecordListener;
+
+public class BaseLogStream implements LogStream {
+  private LogName logName;
+  private State state;
+  private long size;
+
+  public BaseLogStream(LogName logName, State state, long size) {
+    this.logName = logName;
+    this.state = state;
+    this.size = size;
+  }
+
+  @Override
+  public LogName getName() {
+    return logName;
+  }
+
+  @Override
+  public State getState() {
+    return state;
+  }
+
+  @Override
+  public long getSize() {
+    return size;
+  }
+
+  @Override
+  public LogReader createReader() {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public LogWriter createWriter() {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public long getLastRecordId() {
+    // TODO Auto-generated method stub
+    return 0;
+  }
+
+  @Override
+  public Set<RecordListener> getRecordListeners() {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public LogStreamConfiguration getConfiguration() {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/59de6bd6/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogServiceImpl.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogServiceImpl.java
new file mode 100644
index 0000000..407e498
--- /dev/null
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogServiceImpl.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.logservice.impl;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.client.impl.ClientProtoUtils;
+import org.apache.ratis.logservice.api.LogName;
+import org.apache.ratis.logservice.api.LogService;
+import org.apache.ratis.logservice.api.LogStream;
+import org.apache.ratis.logservice.api.LogStream.State;
+import org.apache.ratis.logservice.api.LogStreamConfiguration;
+import org.apache.ratis.logservice.api.RecordListener;
+import org.apache.ratis.logservice.util.LogServiceProtoUtil;
+import org.apache.ratis.proto.logservice.LogServiceProtos.ArchiveLogReplyProto;
+import org.apache.ratis.proto.logservice.LogServiceProtos.CloseLogReplyProto;
+import org.apache.ratis.proto.logservice.LogServiceProtos.CreateLogReplyProto;
+import org.apache.ratis.proto.logservice.LogServiceProtos.DeleteLogReplyProto;
+import org.apache.ratis.proto.logservice.LogServiceProtos.GetLogReplyProto;
+import org.apache.ratis.proto.logservice.LogServiceProtos.GetStateReplyProto;
+import org.apache.ratis.proto.logservice.LogServiceProtos.ListLogsReplyProto;
+import org.apache.ratis.proto.logservice.LogServiceProtos.LogStreamProto;
+import org.apache.ratis.proto.logservice.LogServiceProtos.LogStreamState;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
+
+public class LogServiceImpl implements LogService {
+
+  final private RaftClient raftClient;
+
+  public LogServiceImpl(RaftClient raftClient) {
+    this.raftClient = raftClient;
+  }
+
+  @Override
+  public LogStream createLog(LogName name) throws IOException {
+    RaftClientReply reply =
+        
raftClient.send(Message.valueOf(LogServiceProtoUtil.toCreateLogRequestProto(name)
+            .toByteString()));
+    CreateLogReplyProto parseFrom = 
CreateLogReplyProto.parseFrom(reply.getMessage().getContent());
+    return LogServiceProtoUtil.toLogStream(parseFrom.getLogStream());
+  }
+
+  @Override
+  public LogStream createLog(LogName name, LogStreamConfiguration config) 
throws IOException {
+    // TODO need to make changes in the create log request to pass config.
+    return createLog(name);
+  }
+
+  @Override
+  public LogStream getLog(LogName name) throws IOException {
+    RaftClientReply reply =
+        
raftClient.send(Message.valueOf(LogServiceProtoUtil.toGetLogRequestProto(name)
+            .toByteString()));
+    GetLogReplyProto parseFrom = 
GetLogReplyProto.parseFrom(reply.getMessage().getContent());
+    return LogServiceProtoUtil.toLogStream(parseFrom.getLogStream());
+  }
+
+  @Override
+  public Iterator<LogStream> listLogs() throws IOException {
+    RaftClientReply reply =
+        raftClient
+            
.send(Message.valueOf(LogServiceProtoUtil.toListLogRequestProto().toByteString()));
+    ListLogsReplyProto parseFrom = 
ListLogsReplyProto.parseFrom(reply.getMessage().getContent());
+    List<LogStreamProto> logStremsList = parseFrom.getLogStremsList();
+    return LogServiceProtoUtil.toListLogStreams(logStremsList).iterator();
+  }
+
+  @Override
+  public void closeLog(LogName name) throws IOException {
+    RaftClientReply reply =
+        
raftClient.send(Message.valueOf(LogServiceProtoUtil.toCloseLogRequestProto(name)
+            .toByteString()));
+    CloseLogReplyProto parseFrom = 
CloseLogReplyProto.parseFrom(reply.getMessage().getContent());
+  }
+
+  @Override
+  public State getState(LogName name) throws IOException {
+    RaftClientReply reply =
+        
raftClient.send(Message.valueOf(LogServiceProtoUtil.toGetStateRequestProto(name)
+            .toByteString()));
+    GetStateReplyProto parseFrom = 
GetStateReplyProto.parseFrom(reply.getMessage().getContent());
+    return parseFrom.getState() == LogStreamState.OPEN ? State.OPEN : 
State.CLOSED;
+  }
+
+  @Override
+  public void archiveLog(LogName name) throws IOException {
+    RaftClientReply reply =
+        
raftClient.send(Message.valueOf(LogServiceProtoUtil.toArchiveLogRequestProto(name)
+            .toByteString()));
+    ArchiveLogReplyProto parseFrom =
+        ArchiveLogReplyProto.parseFrom(reply.getMessage().getContent());
+  }
+
+  @Override
+  public void deleteLog(LogName name) throws IOException {
+    RaftClientReply reply =
+        
raftClient.send(Message.valueOf(LogServiceProtoUtil.toDeleteLogRequestProto(name)
+            .toByteString()));
+    DeleteLogReplyProto parseFrom = 
DeleteLogReplyProto.parseFrom(reply.getMessage().getContent());
+  }
+
+  @Override
+  public void updateConfiguration(LogName name, LogStreamConfiguration config) 
{
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public void addRecordListener(LogName name, RecordListener listener) {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public void removeRecordListener(LogName name, RecordListener listener) {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public void close() throws IOException {
+    // TODO Auto-generated method stub
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/59de6bd6/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceProtoUtil.java
----------------------------------------------------------------------
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceProtoUtil.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceProtoUtil.java
new file mode 100644
index 0000000..998dc17
--- /dev/null
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceProtoUtil.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless 
required by applicable
+ * law or agreed to in writing, software distributed under the License is 
distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied. See the License
+ * for the specific language governing permissions and limitations under the 
License.
+ */
+package org.apache.ratis.logservice.util;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.ratis.logservice.api.LogMessage;
+import org.apache.ratis.logservice.api.LogName;
+import org.apache.ratis.logservice.api.LogStream;
+import org.apache.ratis.logservice.api.LogStream.State;
+import org.apache.ratis.logservice.impl.BaseLogStream;
+import 
org.apache.ratis.proto.logservice.LogServiceProtos.ArchiveLogRequestProto;
+import org.apache.ratis.proto.logservice.LogServiceProtos.CloseLogRequestProto;
+import org.apache.ratis.proto.logservice.LogServiceProtos.CreateLogReplyProto;
+import 
org.apache.ratis.proto.logservice.LogServiceProtos.CreateLogRequestProto;
+import 
org.apache.ratis.proto.logservice.LogServiceProtos.DeleteLogRequestProto;
+import org.apache.ratis.proto.logservice.LogServiceProtos.GetLogReplyProto;
+import org.apache.ratis.proto.logservice.LogServiceProtos.GetLogRequestProto;
+import org.apache.ratis.proto.logservice.LogServiceProtos.GetStateReplyProto;
+import org.apache.ratis.proto.logservice.LogServiceProtos.GetStateRequestProto;
+import org.apache.ratis.proto.logservice.LogServiceProtos.ListLogsReplyProto;
+import 
org.apache.ratis.proto.logservice.LogServiceProtos.ListLogsReplyProto.Builder;
+import org.apache.ratis.proto.logservice.LogServiceProtos.ListLogsRequestProto;
+import org.apache.ratis.proto.logservice.LogServiceProtos.LogNameProto;
+import 
org.apache.ratis.proto.logservice.LogServiceProtos.LogServiceRequestProto;
+import org.apache.ratis.proto.logservice.LogServiceProtos.LogStreamProto;
+import org.apache.ratis.proto.logservice.LogServiceProtos.LogStreamState;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public class LogServiceProtoUtil {
+  public static LogServiceRequestProto toCreateLogRequestProto(LogName 
logName) {
+    LogNameProto logNameProto = 
LogNameProto.newBuilder().setName(logName.getName()).build();
+    CreateLogRequestProto createLog =
+        CreateLogRequestProto.newBuilder().setLogName(logNameProto).build();
+    return LogServiceRequestProto.newBuilder().setCreateLog(createLog).build();
+  }
+
+  public static LogServiceRequestProto toListLogRequestProto() {
+    ListLogsRequestProto listLogs = ListLogsRequestProto.newBuilder().build();
+    return LogServiceRequestProto.newBuilder().setListLogs(listLogs).build();
+  }
+
+  public static LogServiceRequestProto toGetLogRequestProto(LogName name) {
+    GetLogRequestProto getLog =
+        
GetLogRequestProto.newBuilder().setLogName(toLogNameProto(name)).build();
+    return LogServiceRequestProto.newBuilder().setGetLog(getLog).build();
+  }
+
+  public static LogServiceRequestProto toCloseLogRequestProto(LogName logName) 
{
+    LogNameProto logNameProto = 
LogNameProto.newBuilder().setName(logName.getName()).build();
+    CloseLogRequestProto closeLog =
+        CloseLogRequestProto.newBuilder().setLogName(logNameProto).build();
+    return LogServiceRequestProto.newBuilder().setCloseLog(closeLog).build();
+  }
+
+  public static LogServiceRequestProto toGetStateRequestProto(LogName logName) 
{
+    LogNameProto logNameProto = 
LogNameProto.newBuilder().setName(logName.getName()).build();
+    GetStateRequestProto getState =
+        GetStateRequestProto.newBuilder().setLogName(logNameProto).build();
+    return LogServiceRequestProto.newBuilder().setGetState(getState).build();
+  }
+
+  public static LogServiceRequestProto toArchiveLogRequestProto(LogName 
logName) {
+    LogNameProto logNameProto = 
LogNameProto.newBuilder().setName(logName.getName()).build();
+    ArchiveLogRequestProto archiveLog =
+        ArchiveLogRequestProto.newBuilder().setLogName(logNameProto).build();
+    return 
LogServiceRequestProto.newBuilder().setArchiveLog(archiveLog).build();
+  }
+
+  public static LogServiceRequestProto toDeleteLogRequestProto(LogName 
logName) {
+    LogNameProto logNameProto = 
LogNameProto.newBuilder().setName(logName.getName()).build();
+    DeleteLogRequestProto deleteLog =
+        DeleteLogRequestProto.newBuilder().setLogName(logNameProto).build();
+    return LogServiceRequestProto.newBuilder().setDeleteLog(deleteLog).build();
+  }
+
+  public static LogMessage toLogMessage(
+      org.apache.ratis.proto.logservice.LogServiceProtos.LogMessage message) {
+    if (!message.getData().isEmpty()) {
+      return new LogMessage(LogName.of(message.getLogName()), 
message.getData().toByteArray());
+    } else if (message.getLength() != 0) {
+      return new LogMessage(LogName.of(message.getLogName()), 
message.getLength());
+    } else {
+      return new LogMessage(LogName.of(message.getLogName()));
+    }
+  }
+
+  public static LogNameProto toLogNameProto(LogName logName) {
+    return LogNameProto.newBuilder().setName(logName.getName()).build();
+  }
+
+  public static LogName toLogName(LogNameProto logNameProto) {
+    return LogName.of(logNameProto.getName());
+  }
+
+  public static LogStreamProto toLogStreamProto(LogStream logStream) {
+    LogNameProto logNameProto =
+        
LogNameProto.newBuilder().setName(logStream.getName().getName()).build();
+    LogStreamProto logStreamProto =
+        LogStreamProto
+            .newBuilder()
+            .setLogName(logNameProto)
+            .setSize(logStream.getSize())
+            .setState(
+              logStream.getState().equals(State.OPEN) ? LogStreamState.OPEN : 
LogStreamState.CLOSED)
+            .build();
+    return logStreamProto;
+  }
+
+  public static LogStream toLogStream(LogStreamProto logStream) {
+    return new BaseLogStream(toLogName(logStream.getLogName()),
+        (logStream.getState() == LogStreamState.OPEN ? State.OPEN : 
State.CLOSED),
+        logStream.getSize());
+  }
+
+  public static CreateLogReplyProto toCreateLogReplyProto(LogStream logStream) 
{
+    LogNameProto logNameProto =
+        
LogNameProto.newBuilder().setName(logStream.getName().getName()).build();
+    LogStreamProto logStreamProto =
+        LogStreamProto
+            .newBuilder()
+            .setLogName(logNameProto)
+            .setSize(logStream.getSize())
+            .setState(
+              logStream.getState().equals(State.OPEN) ? LogStreamState.OPEN : 
LogStreamState.CLOSED)
+            .build();
+    return 
CreateLogReplyProto.newBuilder().setLogStream(logStreamProto).build();
+  }
+
+  public static ListLogsReplyProto toListLogLogsReplyProto(List<LogStream> 
logStreams) {
+    Builder newBuilder = ListLogsReplyProto.newBuilder();
+    for (LogStream stream : logStreams) {
+      newBuilder.addLogStrems(toLogStreamProto(stream));
+    }
+    return newBuilder.build();
+  }
+
+  public static List<LogStream> toListLogStreams(List<LogStreamProto> 
logStreamProtos) {
+    List<LogStream> logStreams = new ArrayList<>(logStreamProtos.size());
+    for (LogStreamProto proto : logStreamProtos) {
+      logStreams.add(toLogStream(proto));
+    }
+    return logStreams;
+  }
+
+  public static GetLogReplyProto toGetLogReplyProto(LogStream logStream) {
+    return 
GetLogReplyProto.newBuilder().setLogStream(toLogStreamProto(logStream)).build();
+  }
+
+  public static GetStateReplyProto toGetStateReplyProto(boolean exists) {
+    return GetStateReplyProto.newBuilder()
+        .setState(exists ? LogStreamState.OPEN : 
LogStreamState.CLOSED).build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/59de6bd6/ratis-logservice/src/test/java/org/apache/ratis/logservice/LogServiceBaseTest.java
----------------------------------------------------------------------
diff --git 
a/ratis-logservice/src/test/java/org/apache/ratis/logservice/LogServiceBaseTest.java
 
b/ratis-logservice/src/test/java/org/apache/ratis/logservice/LogServiceBaseTest.java
new file mode 100644
index 0000000..ecdc905
--- /dev/null
+++ 
b/ratis-logservice/src/test/java/org/apache/ratis/logservice/LogServiceBaseTest.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.logservice;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+
+import org.apache.ratis.BaseTest;
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.logservice.api.LogName;
+import org.apache.ratis.logservice.api.LogService;
+import org.apache.ratis.logservice.api.LogStateMachine;
+import org.apache.ratis.logservice.api.LogStream;
+import org.apache.ratis.logservice.api.LogStream.State;
+import org.apache.ratis.statemachine.StateMachine;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class LogServiceBaseTest<CLUSTER extends MiniRaftCluster>
+    extends BaseTest
+    implements MiniRaftCluster.Factory.Get<CLUSTER> {
+  public static final Logger LOG = 
LoggerFactory.getLogger(LogServiceBaseTest.class);
+
+  {
+    final RaftProperties p = getProperties();
+    p.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
+        LogStateMachine.class, StateMachine.class);
+  }
+
+  static final int NUM_PEERS = 3;
+  CLUSTER cluster;
+
+  @Before
+  public void setUpCluster() throws IOException, InterruptedException {
+    cluster = newCluster(NUM_PEERS);
+    cluster.start();
+    RaftTestUtil.waitForLeader(cluster);
+  }
+
+  @Test
+  public void testLogServiceAdminAPIs() throws Exception {
+    RaftClient raftClient =
+        
RaftClient.newBuilder().setProperties(getProperties()).setRaftGroup(cluster.getGroup())
+            .build();
+    LogService logService = 
LogServiceFactory.getInstance().createLogService(raftClient);
+    LogName logName = LogName.of("log1");
+    LogStream logStream = logService.createLog(logName);
+    assertEquals("log1", logStream.getName().getName());
+    assertEquals(State.OPEN, logStream.getState());
+    assertEquals(0, logStream.getSize());
+    logService.getLog(logName);
+    assertEquals("log1", logStream.getName().getName());
+    assertEquals(State.OPEN, logStream.getState());
+    assertEquals(0, logStream.getSize());
+    logStream = logService.listLogs().next();
+    assertEquals("log1", logStream.getName().getName());
+    assertEquals(State.OPEN, logStream.getState());
+    assertEquals(0, logStream.getSize());
+    State state = logService.getState(logName);
+    assertEquals(State.OPEN, state);
+  }
+  
+  @After
+  public void tearDown() {
+    cluster.shutdown();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/59de6bd6/ratis-logservice/src/test/java/org/apache/ratis/logservice/TestLogServiceWithGrpc.java
----------------------------------------------------------------------
diff --git 
a/ratis-logservice/src/test/java/org/apache/ratis/logservice/TestLogServiceWithGrpc.java
 
b/ratis-logservice/src/test/java/org/apache/ratis/logservice/TestLogServiceWithGrpc.java
new file mode 100644
index 0000000..c3549a8
--- /dev/null
+++ 
b/ratis-logservice/src/test/java/org/apache/ratis/logservice/TestLogServiceWithGrpc.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.logservice;
+
+import org.apache.ratis.grpc.MiniRaftClusterWithGrpc;
+
+public class TestLogServiceWithGrpc extends 
LogServiceBaseTest<MiniRaftClusterWithGrpc>
+    implements MiniRaftClusterWithGrpc.FactoryGet {
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/59de6bd6/ratis-logservice/src/test/java/org/apache/ratis/logservice/TestLogServiceWithNetty.java
----------------------------------------------------------------------
diff --git 
a/ratis-logservice/src/test/java/org/apache/ratis/logservice/TestLogServiceWithNetty.java
 
b/ratis-logservice/src/test/java/org/apache/ratis/logservice/TestLogServiceWithNetty.java
new file mode 100644
index 0000000..e483627
--- /dev/null
+++ 
b/ratis-logservice/src/test/java/org/apache/ratis/logservice/TestLogServiceWithNetty.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.logservice;
+
+import org.apache.ratis.netty.MiniRaftClusterWithNetty;
+
+public class TestLogServiceWithNetty
+    extends LogServiceBaseTest<MiniRaftClusterWithNetty>
+    implements MiniRaftClusterWithNetty.FactoryGet {
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/59de6bd6/ratis-proto/src/main/proto/Logservice.proto
----------------------------------------------------------------------
diff --git a/ratis-proto/src/main/proto/Logservice.proto 
b/ratis-proto/src/main/proto/Logservice.proto
index 8ea32f2..40964e0 100644
--- a/ratis-proto/src/main/proto/Logservice.proto
+++ b/ratis-proto/src/main/proto/Logservice.proto
@@ -33,3 +33,83 @@ message LogMessage {
        uint64 length = 3;
        bytes  data = 4;
 }
+message LogServiceRequestProto {
+  oneof Request {
+    LogMessage logMessage = 1;
+    CreateLogRequestProto createLog = 2;
+    ListLogsRequestProto listLogs = 3;
+    GetLogRequestProto getLog = 4;
+    CloseLogRequestProto closeLog = 5;
+    GetStateRequestProto getState = 6;
+    ArchiveLogRequestProto archiveLog = 7;
+    DeleteLogRequestProto deleteLog = 8;
+  }
+}
+
+message LogNameProto {
+  string name = 1;
+}
+
+message CreateLogRequestProto {
+  LogNameProto logName = 1;
+}
+
+message ListLogsRequestProto {
+  repeated LogStreamProto logSreams = 1;
+}
+
+message GetLogRequestProto {
+  LogNameProto logName = 1;
+}
+
+message CloseLogRequestProto {
+  LogNameProto logName = 1;
+}
+
+message GetStateRequestProto {
+  LogNameProto logName = 1;
+}
+
+message ArchiveLogRequestProto {
+  LogNameProto logName = 1;
+}
+
+message DeleteLogRequestProto {
+  LogNameProto logName = 1;
+}
+
+message LogStreamProto {
+  LogNameProto logName = 1;
+  uint64 size = 2;
+  LogStreamState state = 3;
+}
+
+enum LogStreamState {
+  OPEN = 0;
+  CLOSED = 1;
+}
+
+message CreateLogReplyProto {
+  LogStreamProto logStream = 1;
+}
+
+message GetLogReplyProto {
+  LogStreamProto logStream = 1;
+}
+
+message ListLogsReplyProto {
+  repeated LogStreamProto logStrems = 1;
+}
+
+message CloseLogReplyProto {
+}
+
+message GetStateReplyProto {
+  LogStreamState state = 1;
+}
+
+message ArchiveLogReplyProto {
+}
+
+message DeleteLogReplyProto {
+}


Reply via email to