RATIS-334 Implement server membership for LogService Metadata Service

Signed-off-by: Josh Elser <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/85d8e025
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/85d8e025
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/85d8e025

Branch: refs/heads/master
Commit: 85d8e025ffe332dc1329fbecd41d841fbc136db8
Parents: cce03d0
Author: Sergey Soldatov <[email protected]>
Authored: Fri Oct 19 01:12:36 2018 -0700
Committer: Josh Elser <[email protected]>
Committed: Wed Oct 24 19:18:19 2018 -0400

----------------------------------------------------------------------
 ratis-logservice/pom.xml                        | 134 +++++++
 .../apache/ratis/logservice/api/LogInfo.java    |  49 +++
 .../apache/ratis/logservice/api/LogName.java    |   4 +-
 .../ratis/logservice/api/LogStateMachine.java   | 151 ++++----
 .../logservice/client/LogServiceClient.java     | 146 ++++++++
 .../ratis/logservice/common/Constants.java      |  31 ++
 .../common/LogAlreadyExistException.java        |  27 ++
 .../logservice/common/LogNotFoundException.java |  28 ++
 .../common/NoEnoughWorkersException.java        |  32 ++
 .../ratis/logservice/impl/LogReaderImpl.java    |   3 +-
 .../ratis/logservice/impl/LogServiceImpl.java   |  76 ++--
 .../ratis/logservice/impl/LogStreamImpl.java    |   4 +-
 .../ratis/logservice/impl/LogWriterImpl.java    |  18 +-
 .../server/ManagementStateMachine.java          |  29 ++
 .../ratis/logservice/server/MasterServer.java   | 177 +++++++++
 .../logservice/server/MetaStateMachine.java     | 371 +++++++++++++++++++
 .../logservice/util/LogServiceProtoUtil.java    | 160 ++------
 .../ratis/logservice/util/LogServiceUtils.java  |  54 +++
 .../logservice/util/MetaServiceProtoUtil.java   | 209 +++++++++++
 .../logservice/worker/LogServiceWorker.java     | 130 +++++++
 .../src/main/proto/LogService.proto             | 135 +++++++
 .../src/main/proto/MetaService.proto            | 143 +++++++
 .../src/main/resources/log4j.properties         |  23 ++
 .../ratis/logservice/LogServiceBaseTest.java    |   9 +-
 .../ratis/logservice/server/TestMetaServer.java | 200 ++++++++++
 .../logservice/util/LogServiceCluster.java      | 156 ++++++++
 .../util/TestLogServiceProtoUtil.java           | 109 +-----
 ratis-proto/src/main/proto/Logservice.proto     | 178 ---------
 28 files changed, 2224 insertions(+), 562 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/85d8e025/ratis-logservice/pom.xml
----------------------------------------------------------------------
diff --git a/ratis-logservice/pom.xml b/ratis-logservice/pom.xml
index a3c114d..9556dcd 100644
--- a/ratis-logservice/pom.xml
+++ b/ratis-logservice/pom.xml
@@ -23,6 +23,135 @@
   <artifactId>ratis-logservice</artifactId>
   <name>Apache Ratis LogService</name>
 
+  <build>
+    <extensions>
+      <!-- Use os-maven-plugin to initialize the "os.detected" properties -->
+      <extension>
+        <groupId>kr.motd.maven</groupId>
+        <artifactId>os-maven-plugin</artifactId>
+        <version>1.5.0.Final</version>
+      </extension>
+    </extensions>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <configuration>
+          <compilerArgs>
+            <!-- disable all javac warnings for shaded sources -->
+            <arg>-Xlint:none</arg>
+            <arg>-XDignore.symbol.file</arg>
+          </compilerArgs>
+          <showWarnings>false</showWarnings>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-checkstyle-plugin</artifactId>
+        <configuration/>
+      </plugin>
+      <!-- Make a jar and put the sources in the jar -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-source-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <!--Make it so assembly:single does nothing in here-->
+        <artifactId>maven-assembly-plugin</artifactId>
+        <configuration>
+          <skipAssembly>true</skipAssembly>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.xolstice.maven.plugins</groupId>
+        <artifactId>protobuf-maven-plugin</artifactId>
+        <configuration>
+          <includes>
+            <include>**/*.proto</include>
+          </includes>
+          <protocArtifact>
+            
com.google.protobuf:protoc:${shaded.protobuf.version}:exe:${os.detected.classifier}
+          </protocArtifact>
+          <!-- Place these in a location that compiler-plugin is already 
looking -->
+          
<outputDirectory>${project.build.directory}/generated-sources</outputDirectory>
+          <!-- With multiple executions, this must be `false` otherwise we 
wipe out the previous execution -->
+          <clearOutputDirectory>false</clearOutputDirectory>
+        </configuration>
+        <executions>
+          <execution>
+            <id>compile-protobuf</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>compile</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>com.google.code.maven-replacer-plugin</groupId>
+        <artifactId>replacer</artifactId>
+        <version>1.5.3</version>
+        <executions>
+          <execution>
+            <phase>process-sources</phase>
+            <goals>
+              <goal>replace</goal>
+            </goals>
+          </execution>
+        </executions>
+        <configuration>
+          
<basedir>${project.build.directory}/generated-sources/org/apache/ratis/logservice/proto</basedir>
+          <includes>
+            <include>**/*.java</include>
+          </includes>
+          <replacements>
+            <replacement>
+              <token>([^\.])com.google</token>
+              <value>$1org.apache.ratis.thirdparty.com.google</value>
+            </replacement>
+            <replacement>
+              <token>([^\.])io.grpc</token>
+              <value>$1org.apache.ratis.thirdparty.io.grpc</value>
+            </replacement>
+          </replacements>
+        </configuration>
+      </plugin>
+    </plugins>
+    <pluginManagement>
+      <plugins>
+        <!--This plugin's configuration is used to store Eclipse m2e settings 
only. It has no influence on the Maven build itself.-->
+        <plugin>
+          <groupId>org.eclipse.m2e</groupId>
+          <artifactId>lifecycle-mapping</artifactId>
+          <version>1.0.0</version>
+          <configuration>
+            <lifecycleMappingMetadata>
+              <pluginExecutions>
+                <pluginExecution>
+                  <pluginExecutionFilter>
+                    <groupId>
+                      com.google.code.maven-replacer-plugin
+                    </groupId>
+                    <artifactId>replacer</artifactId>
+                    <versionRange>
+                      [1.5.3,)
+                    </versionRange>
+                    <goals>
+                      <goal>replace</goal>
+                    </goals>
+                  </pluginExecutionFilter>
+                  <action>
+                    <ignore></ignore>
+                  </action>
+                </pluginExecution>
+              </pluginExecutions>
+            </lifecycleMappingMetadata>
+          </configuration>
+        </plugin>
+      </plugins>
+    </pluginManagement>
+  </build>
+
   <dependencies>
     <!-- Ratis dependencies -->
     <dependency>
@@ -112,5 +241,10 @@
       <artifactId>mockito-all</artifactId>
       <scope>test</scope>
     </dependency>
+      <dependency>
+          <groupId>com.beust</groupId>
+          <artifactId>jcommander</artifactId>
+          <version>1.72</version>
+      </dependency>
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/85d8e025/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogInfo.java
----------------------------------------------------------------------
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogInfo.java 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogInfo.java
new file mode 100644
index 0000000..63a07d1
--- /dev/null
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogInfo.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.logservice.api;
+
+import org.apache.ratis.protocol.RaftGroup;
+
+/**
+ * This classs represent the pair of LogName -> RaftGroup
+ */
+public class LogInfo {
+    private final RaftGroup raftGroup;
+    private final LogName logName;
+    public LogInfo(LogName logName, RaftGroup raftGroup) {
+        this.logName = logName;
+        this.raftGroup = raftGroup;
+    }
+
+    /**
+     * Log name
+     * @return
+     */
+    public LogName getLogName() {
+        return logName;
+    }
+
+    /**
+     * Raft group
+     * @return
+     */
+    public RaftGroup getRaftGroup() {
+        return raftGroup;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/85d8e025/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogName.java
----------------------------------------------------------------------
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogName.java 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogName.java
index 3405340..00496dc 100644
--- 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogName.java
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogName.java
@@ -21,7 +21,7 @@ import static java.util.Objects.requireNonNull;
 
 import java.util.Objects;
 
-import org.apache.ratis.proto.logservice.LogServiceProtos.LogNameProto;
+import org.apache.ratis.logservice.proto.LogServiceProtos;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import 
org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
 
@@ -83,7 +83,7 @@ public class LogName {
 
   public static LogName parseFrom(ByteString logName)
       throws InvalidProtocolBufferException {
-    LogNameProto logNameProto = LogNameProto.parseFrom(logName);
+    LogServiceProtos.LogNameProto logNameProto = 
LogServiceProtos.LogNameProto.parseFrom(logName);
     return new LogName(logNameProto.getName());
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/85d8e025/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStateMachine.java
----------------------------------------------------------------------
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStateMachine.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStateMachine.java
index 1a3edc2..1617c94 100644
--- 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStateMachine.java
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStateMachine.java
@@ -35,23 +35,10 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.ratis.logservice.impl.LogStreamImpl;
+import org.apache.ratis.logservice.proto.LogServiceProtos.*;
 import org.apache.ratis.logservice.util.LogServiceProtoUtil;
 import org.apache.ratis.proto.RaftProtos;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
-import 
org.apache.ratis.proto.logservice.LogServiceProtos.AppendLogEntryRequestProto;
-import org.apache.ratis.proto.logservice.LogServiceProtos.ArchiveLogReplyProto;
-import 
org.apache.ratis.proto.logservice.LogServiceProtos.ArchiveLogRequestProto;
-import org.apache.ratis.proto.logservice.LogServiceProtos.CloseLogReplyProto;
-import org.apache.ratis.proto.logservice.LogServiceProtos.CloseLogRequestProto;
-import 
org.apache.ratis.proto.logservice.LogServiceProtos.CreateLogRequestProto;
-import org.apache.ratis.proto.logservice.LogServiceProtos.DeleteLogReplyProto;
-import 
org.apache.ratis.proto.logservice.LogServiceProtos.DeleteLogRequestProto;
-import 
org.apache.ratis.proto.logservice.LogServiceProtos.GetLogLengthRequestProto;
-import org.apache.ratis.proto.logservice.LogServiceProtos.GetLogReplyProto;
-import org.apache.ratis.proto.logservice.LogServiceProtos.GetLogRequestProto;
-import org.apache.ratis.proto.logservice.LogServiceProtos.GetStateRequestProto;
-import 
org.apache.ratis.proto.logservice.LogServiceProtos.LogServiceRequestProto;
-import org.apache.ratis.proto.logservice.LogServiceProtos.ReadLogRequestProto;
 import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.server.RaftServer;
@@ -203,10 +190,6 @@ public class LogStateMachine extends BaseStateMachine {
           return processGetLengthRequest(logServiceRequestProto);
         case STARTINDEXQUERY:
           return processGetStartIndexRequest(logServiceRequestProto);
-        case LISTLOGS:
-          return processListLogsRequest();
-        case GETLOG:
-          return processGetLogRequest(logServiceRequestProto);
         case GETSTATE:
           return processGetStateRequest(logServiceRequestProto);
         default:
@@ -344,14 +327,14 @@ public class LogStateMachine extends BaseStateMachine {
           
LogServiceRequestProto.parseFrom(entry.getStateMachineLogEntry().getLogData());
       switch (logServiceRequestProto.getRequestCase()) {
 
-        case CREATELOG:
-          return processCreateLogRequest(logServiceRequestProto);
-        case ARCHIVELOG:
-          return processArchiveLog(logServiceRequestProto);
+//        case CREATELOG:
+//          return processCreateLogRequest(logServiceRequestProto);
+//        case ARCHIVELOG:
+//          return processArchiveLog(logServiceRequestProto);
         case CLOSELOG:
           return processCloseLog(logServiceRequestProto);
-        case DELETELOG:
-          return processDeleteLog(logServiceRequestProto);
+//        case DELETELOG:
+//          return processDeleteLog(logServiceRequestProto);
         case APPENDREQUEST:
           return processAppendRequest(trx, logServiceRequestProto);
         case SYNCREQUEST:
@@ -366,17 +349,17 @@ public class LogStateMachine extends BaseStateMachine {
     }
   }
 
-  private CompletableFuture<Message>
-      processDeleteLog(LogServiceRequestProto logServiceRequestProto) {
-    DeleteLogRequestProto deleteLog = logServiceRequestProto.getDeleteLog();
-    LogName logName = LogServiceProtoUtil.toLogName(deleteLog.getLogName());
-    try (final AutoCloseableLock writeLock = writeLock()) {
-      state.remove(logName);
-    }
-    // TODO need to handle exceptions while operating with files.
-    return CompletableFuture.completedFuture(Message
-      .valueOf(DeleteLogReplyProto.newBuilder().build().toByteString()));
-  }
+//  private CompletableFuture<Message>
+//      processDeleteLog(LogServiceRequestProto logServiceRequestProto) {
+//    DeleteLogRequestProto deleteLog = logServiceRequestProto.getDeleteLog();
+//    LogName logName = LogServiceProtoUtil.toLogName(deleteLog.getLogName());
+//    try (final AutoCloseableLock writeLock = writeLock()) {
+//      state.remove(logName);
+//    }
+//    // TODO need to handle exceptions while operating with files.
+//    return CompletableFuture.completedFuture(Message
+//      .valueOf(DeleteLogReplyProto.newBuilder().build().toByteString()));
+//  }
 
   private CompletableFuture<Message> processCloseLog(LogServiceRequestProto 
logServiceRequestProto) {
     CloseLogRequestProto closeLog = logServiceRequestProto.getCloseLog();
@@ -387,14 +370,14 @@ public class LogStateMachine extends BaseStateMachine {
       .valueOf(CloseLogReplyProto.newBuilder().build().toByteString()));
   }
 
-  private CompletableFuture<Message>
-      processArchiveLog(LogServiceRequestProto logServiceRequestProto) {
-    ArchiveLogRequestProto archiveLog = logServiceRequestProto.getArchiveLog();
-    LogName logName = LogServiceProtoUtil.toLogName(archiveLog.getLogName());
-    // Handle log archiving.
-    return CompletableFuture.completedFuture(Message
-      .valueOf(ArchiveLogReplyProto.newBuilder().build().toByteString()));
-  }
+//  private CompletableFuture<Message>
+//      processArchiveLog(LogServiceRequestProto logServiceRequestProto) {
+//    ArchiveLogRequestProto archiveLog = 
logServiceRequestProto.getArchiveLog();
+//    LogName logName = LogServiceProtoUtil.toLogName(archiveLog.getLogName());
+//    // Handle log archiving.
+//    return CompletableFuture.completedFuture(Message
+//      .valueOf(ArchiveLogReplyProto.newBuilder().build().toByteString()));
+//  }
 
   private CompletableFuture<Message> processGetStateRequest(
       LogServiceRequestProto logServiceRequestProto) {
@@ -404,48 +387,48 @@ public class LogStateMachine extends BaseStateMachine {
         .toGetStateReplyProto(state.containsKey(logName)).toByteString()));
   }
 
-  private CompletableFuture<Message> processCreateLogRequest(
-      LogServiceRequestProto logServiceRequestProto) {
-    Long val;
-    LogName name;
-    try (final AutoCloseableLock writeLock = writeLock()) {
-      CreateLogRequestProto createLog = logServiceRequestProto.getCreateLog();
-      name = LogServiceProtoUtil.toLogName(createLog.getLogName());
-      val = state.get(name);
-      if (val == null) {
-        val = new Long(0);
-      }
-      state.put(name, val);
-    }
-    //TODO This can't be part of a state machine (REMOVE)
-    return 
CompletableFuture.completedFuture(Message.valueOf(LogServiceProtoUtil
-        .toCreateLogReplyProto(
-          new LogStreamImpl(name, null, new 
LogServiceConfiguration())).toByteString()));
-  }
-
-  //TODO REMOVE this code
-  private CompletableFuture<Message> processListLogsRequest() {
-    List<LogStream> logStreams = new ArrayList<LogStream>(state.size());
-    for (Entry<LogName, Long> e : state.entrySet()) {
-      logStreams.add(new LogStreamImpl(e.getKey(), null, new 
LogServiceConfiguration()));
-    }
-    return 
CompletableFuture.completedFuture(Message.valueOf(LogServiceProtoUtil
-        .toListLogLogsReplyProto(logStreams).toByteString()));
-  }
+//  private CompletableFuture<Message> processCreateLogRequest(
+//      LogServiceRequestProto logServiceRequestProto) {
+//    Long val;
+//    LogName name;
+//    try (final AutoCloseableLock writeLock = writeLock()) {
+//      CreateLogRequestProto createLog = 
logServiceRequestProto.getCreateLog();
+//      name = LogServiceProtoUtil.toLogName(createLog.getLogName());
+//      val = state.get(name);
+//      if (val == null) {
+//        val = new Long(0);
+//      }
+//      state.put(name, val);
+//    }
+//    //TODO This can't be part of a state machine (REMOVE)
+//    return 
CompletableFuture.completedFuture(Message.valueOf(LogServiceProtoUtil
+//        .toCreateLogReplyProto(
+//          new LogStreamImpl(name, null, new 
LogServiceConfiguration())).toByteString()));
+//  }
+
+//  //TODO REMOVE this code
+//  private CompletableFuture<Message> processListLogsRequest() {
+//    List<LogStream> logStreams = new ArrayList<LogStream>(state.size());
+//    for (Entry<LogName, Long> e : state.entrySet()) {
+//      logStreams.add(new LogStreamImpl(e.getKey(), null, new 
LogServiceConfiguration()));
+//    }
+//    return 
CompletableFuture.completedFuture(Message.valueOf(LogServiceProtoUtil
+//        .toListLogLogsReplyProto(logStreams).toByteString()));
+//  }
 
   //TODO REMOVE this code
 
-  private CompletableFuture<Message> processGetLogRequest(
-      LogServiceRequestProto logServiceRequestProto) {
-    GetLogRequestProto getLog = logServiceRequestProto.getGetLog();
-    LogName logName = LogServiceProtoUtil.toLogName(getLog.getLogName());
-    if (state.containsKey(logName)) {
-      return 
CompletableFuture.completedFuture(Message.valueOf(LogServiceProtoUtil
-          .toGetLogReplyProto(new LogStreamImpl(logName, null, new 
LogServiceConfiguration()))
-          .toByteString()));
-    } else {
-      return 
CompletableFuture.completedFuture(Message.valueOf(GetLogReplyProto.newBuilder()
-          .build().toByteString()));
-    }
-  }
+//  private CompletableFuture<Message> processGetLogRequest(
+//      LogServiceRequestProto logServiceRequestProto) {
+//    GetLogRequestProto getLog = logServiceRequestProto.getGetLog();
+//    LogName logName = LogServiceProtoUtil.toLogName(getLog.getLogName());
+//    if (state.containsKey(logName)) {
+//      return 
CompletableFuture.completedFuture(Message.valueOf(LogServiceProtoUtil
+//          .toGetLogReplyProto(new LogStreamImpl(logName, null, new 
LogServiceConfiguration()))
+//          .toByteString()));
+//    } else {
+//      return 
CompletableFuture.completedFuture(Message.valueOf(GetLogReplyProto.newBuilder()
+//          .build().toByteString()));
+//    }
+//  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/85d8e025/ratis-logservice/src/main/java/org/apache/ratis/logservice/client/LogServiceClient.java
----------------------------------------------------------------------
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/client/LogServiceClient.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/client/LogServiceClient.java
new file mode 100644
index 0000000..8ceaafb
--- /dev/null
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/client/LogServiceClient.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.logservice.client;
+
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.logservice.LogServiceFactory;
+import org.apache.ratis.logservice.api.LogInfo;
+import org.apache.ratis.logservice.api.LogName;
+import org.apache.ratis.logservice.api.LogService;
+import org.apache.ratis.logservice.common.Constants;
+import org.apache.ratis.logservice.proto.MetaServiceProtos.*;
+import org.apache.ratis.logservice.util.MetaServiceProtoUtil;
+import org.apache.ratis.protocol.*;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static 
org.apache.ratis.logservice.util.LogServiceUtils.getPeersFromQuorum;
+
+
+/**
+ * LogServiceClient is responsible for all meta service communications such as 
create/get/list logs.
+ * Initialized by the metaQuorum string that has list of masters as 
"server:port' separated by a comma.
+ * An example: 
'server1.example.com:9999,server2.example.com:9999,server3.example.com:9999
+ */
+
+public class LogServiceClient implements AutoCloseable {
+
+
+    // the raft client for meta quorum. All DML operations are going using 
this client.
+    final private RaftClient client;
+
+
+    /**
+     * Constuctor. Build raft client for meta quorum
+     * @param metaQuorum
+     */
+    public LogServiceClient(String metaQuorum) {
+        Set<RaftPeer> peers = getPeersFromQuorum(metaQuorum);
+        RaftProperties properties = new RaftProperties();
+        RaftGroup meta = RaftGroup.valueOf(Constants.metaGroupID, peers);
+        client = RaftClient.newBuilder()
+                .setRaftGroup(meta)
+                .setClientId(ClientId.randomId())
+                .setProperties(properties)
+                .build();
+    }
+
+    /**
+     * Create a new Log request.
+     * @param logName the name of the log to create
+     * @return
+     * @throws IOException
+     */
+    public LogService createLog(LogName logName) throws IOException {
+        RaftClientReply reply = client.sendReadOnly(
+                () -> 
MetaServiceProtoUtil.toCreateLogRequestProto(logName).toByteString());
+        CreateLogReplyProto message = 
CreateLogReplyProto.parseFrom(reply.getMessage().getContent());
+        if (message.hasException()) {
+            throw 
MetaServiceProtoUtil.toMetaServiceException(message.getException());
+        }
+        LogInfo info = MetaServiceProtoUtil.toLogInfo(message.getLog());
+        return 
LogServiceFactory.getInstance().createLogService(getRaftClient(info), null);
+    }
+
+    /**
+     * Get log request.
+     * @param logName the name of the log to get
+     * @return
+     * @throws IOException
+     */
+    public LogService getLog(LogName logName) throws IOException {
+        RaftClientReply reply = client.sendReadOnly
+                (() -> 
MetaServiceProtoUtil.toGetLogRequestProto(logName).toByteString());
+        GetLogReplyProto message = 
GetLogReplyProto.parseFrom(reply.getMessage().getContent());
+        if(message.hasException()) {
+            throw 
MetaServiceProtoUtil.toMetaServiceException(message.getException());
+        }
+        LogInfo info = MetaServiceProtoUtil.toLogInfo(message.getLog());
+        return 
LogServiceFactory.getInstance().createLogService(getRaftClient(info), null);
+    }
+
+
+    public void deleteLog(LogName logName) throws IOException {
+        RaftClientReply reply = client.sendReadOnly
+                (() -> 
MetaServiceProtoUtil.toDeleteLogRequestProto(logName).toByteString());
+        DeleteLogReplyProto message = 
DeleteLogReplyProto.parseFrom(reply.getMessage().getContent());
+        if(message.hasException()) {
+            throw 
MetaServiceProtoUtil.toMetaServiceException(message.getException());
+        }
+    }
+
+    /**
+     * Return the list of available logs
+     * @return
+     * @throws IOException
+     */
+    public List<LogInfo> listLogs() throws IOException {
+        RaftClientReply reply = client.sendReadOnly
+                (() -> 
MetaServiceProtoUtil.toListLogRequestProto().toByteString());
+        ListLogsReplyProto message = 
ListLogsReplyProto.parseFrom(reply.getMessage().getContent());
+        List<LogInfoProto> infoProtos = message.getLogsList();
+        List<LogInfo> infos = infoProtos.stream()
+                .map(proto -> MetaServiceProtoUtil.toLogInfo(proto))
+                .collect(Collectors.toList());
+        return infos;
+    }
+
+    @Override
+    public void close() throws Exception {
+        client.close();
+    }
+
+    // Internal methods
+
+    /**
+     * Build a raft client for the particular log. Temporary here. TODO: 
Should be moved to LogService part
+     * @param logInfo
+     * @return
+     */
+    private RaftClient getRaftClient(LogInfo logInfo) {
+        RaftProperties properties = new RaftProperties();
+        return 
RaftClient.newBuilder().setRaftGroup(logInfo.getRaftGroup()).setProperties(properties).build();
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/85d8e025/ratis-logservice/src/main/java/org/apache/ratis/logservice/common/Constants.java
----------------------------------------------------------------------
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/common/Constants.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/common/Constants.java
new file mode 100644
index 0000000..ba50154
--- /dev/null
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/common/Constants.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.logservice.common;
+
+import org.apache.ratis.protocol.RaftGroupId;
+
+import java.util.UUID;
+
+public class Constants {
+
+    final public static RaftGroupId metaGroupID = RaftGroupId.valueOf(new 
UUID(0,1));
+
+    final public static RaftGroupId serversGroupID = RaftGroupId.valueOf(new 
UUID(0,2));
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/85d8e025/ratis-logservice/src/main/java/org/apache/ratis/logservice/common/LogAlreadyExistException.java
----------------------------------------------------------------------
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/common/LogAlreadyExistException.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/common/LogAlreadyExistException.java
new file mode 100644
index 0000000..74a77ba
--- /dev/null
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/common/LogAlreadyExistException.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.logservice.common;
+
+import java.io.IOException;
+
+public class LogAlreadyExistException extends IOException {
+    public LogAlreadyExistException(String name) {
+        super(name);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/85d8e025/ratis-logservice/src/main/java/org/apache/ratis/logservice/common/LogNotFoundException.java
----------------------------------------------------------------------
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/common/LogNotFoundException.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/common/LogNotFoundException.java
new file mode 100644
index 0000000..f9d672a
--- /dev/null
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/common/LogNotFoundException.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.logservice.common;
+
+import java.io.IOException;
+
+public class LogNotFoundException extends IOException {
+
+    public LogNotFoundException(String logName) {
+        super(logName);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/85d8e025/ratis-logservice/src/main/java/org/apache/ratis/logservice/common/NoEnoughWorkersException.java
----------------------------------------------------------------------
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/common/NoEnoughWorkersException.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/common/NoEnoughWorkersException.java
new file mode 100644
index 0000000..163b771
--- /dev/null
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/common/NoEnoughWorkersException.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.logservice.common;
+
+import java.io.IOException;
+
+public class NoEnoughWorkersException extends IOException {
+
+    public NoEnoughWorkersException(String message) {
+        super(message);
+    }
+
+    public NoEnoughWorkersException(int available) {
+        this("No enough Workers to create a new Log. Currently available 
workers: " + available);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/85d8e025/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogReaderImpl.java
----------------------------------------------------------------------
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogReaderImpl.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogReaderImpl.java
index ae4f2de..3e1ea4d 100644
--- 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogReaderImpl.java
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogReaderImpl.java
@@ -26,9 +26,8 @@ import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.logservice.api.LogReader;
 import org.apache.ratis.logservice.api.LogServiceConfiguration;
 import org.apache.ratis.logservice.api.LogStream;
+import org.apache.ratis.logservice.proto.LogServiceProtos.*;
 import org.apache.ratis.logservice.util.LogServiceProtoUtil;
-import org.apache.ratis.proto.logservice.LogServiceProtos.LogServiceException;
-import org.apache.ratis.proto.logservice.LogServiceProtos.ReadLogReplyProto;
 import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.RaftClientReply;
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/85d8e025/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogServiceImpl.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogServiceImpl.java
index 681a4ab..613ec5e 100644
--- 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogServiceImpl.java
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogServiceImpl.java
@@ -28,16 +28,8 @@ import 
org.apache.ratis.logservice.api.LogServiceConfiguration;
 import org.apache.ratis.logservice.api.LogStream;
 import org.apache.ratis.logservice.api.LogStream.State;
 import org.apache.ratis.logservice.api.RecordListener;
+import org.apache.ratis.logservice.proto.LogServiceProtos.*;
 import org.apache.ratis.logservice.util.LogServiceProtoUtil;
-import org.apache.ratis.proto.logservice.LogServiceProtos.ArchiveLogReplyProto;
-import org.apache.ratis.proto.logservice.LogServiceProtos.CloseLogReplyProto;
-import org.apache.ratis.proto.logservice.LogServiceProtos.CreateLogReplyProto;
-import org.apache.ratis.proto.logservice.LogServiceProtos.DeleteLogReplyProto;
-import org.apache.ratis.proto.logservice.LogServiceProtos.GetLogReplyProto;
-import org.apache.ratis.proto.logservice.LogServiceProtos.GetStateReplyProto;
-import org.apache.ratis.proto.logservice.LogServiceProtos.ListLogsReplyProto;
-import org.apache.ratis.proto.logservice.LogServiceProtos.LogStreamProto;
-import org.apache.ratis.proto.logservice.LogServiceProtos.LogStreamState;
 import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.RaftClientReply;
 
@@ -53,32 +45,33 @@ public class LogServiceImpl implements LogService {
 
   @Override
   public LogStream createLog(LogName name) throws IOException {
-    RaftClientReply reply =
-        
raftClient.send(Message.valueOf(LogServiceProtoUtil.toCreateLogRequestProto(name)
-            .toByteString()));
-    CreateLogReplyProto parseFrom = 
CreateLogReplyProto.parseFrom(reply.getMessage().getContent());
-    return LogServiceProtoUtil.toLogStream(parseFrom.getLogStream(), this);
+//    RaftClientReply reply =
+//        
raftClient.send(Message.valueOf(LogServiceProtoUtil.toCreateLogRequestProto(name)
+//            .toByteString()));
+//    CreateLogReplyProto parseFrom = 
CreateLogReplyProto.parseFrom(reply.getMessage().getContent());
+//    return LogServiceProtoUtil.toLogStream(parseFrom.getLogStream(), this);
+    return new LogStreamImpl(name, this);
   }
 
-
-
   @Override
   public LogStream getLog(LogName name) throws IOException {
-    RaftClientReply reply =
-        
raftClient.sendReadOnly(Message.valueOf(LogServiceProtoUtil.toGetLogRequestProto(name)
-            .toByteString()));
-    GetLogReplyProto parseFrom = 
GetLogReplyProto.parseFrom(reply.getMessage().getContent());
-    return LogServiceProtoUtil.toLogStream(parseFrom.getLogStream(), this);
+//    RaftClientReply reply =
+//        
raftClient.sendReadOnly(Message.valueOf(LogServiceProtoUtil.toGetLogRequestProto(name)
+//            .toByteString()));
+//    GetLogReplyProto parseFrom = 
GetLogReplyProto.parseFrom(reply.getMessage().getContent());
+//    return LogServiceProtoUtil.toLogStream(parseFrom.getLogStream(), this);
+    return null;
   }
 
   @Override
   public Iterator<LogStream> listLogs() throws IOException {
-    RaftClientReply reply =
-        raftClient
-            
.sendReadOnly(Message.valueOf(LogServiceProtoUtil.toListLogRequestProto().toByteString()));
-    ListLogsReplyProto parseFrom = 
ListLogsReplyProto.parseFrom(reply.getMessage().getContent());
-    List<LogStreamProto> logStremsList = parseFrom.getLogStremsList();
-    return LogServiceProtoUtil.toListLogStreams(logStremsList, 
this).iterator();
+//    RaftClientReply reply =
+//        raftClient
+//            
.sendReadOnly(Message.valueOf(LogServiceProtoUtil.toListLogRequestProto().toByteString()));
+//    ListLogsReplyProto parseFrom = 
ListLogsReplyProto.parseFrom(reply.getMessage().getContent());
+//    List<LogStreamProto> logStremsList = parseFrom.getLogStremsList();
+//    return LogServiceProtoUtil.toListLogStreams(logStremsList, 
this).iterator();
+    return null;
   }
 
   @Override
@@ -100,19 +93,19 @@ public class LogServiceImpl implements LogService {
 
   @Override
   public void archiveLog(LogName name) throws IOException {
-    RaftClientReply reply =
-        
raftClient.send(Message.valueOf(LogServiceProtoUtil.toArchiveLogRequestProto(name)
-            .toByteString()));
-    ArchiveLogReplyProto parseFrom =
-        ArchiveLogReplyProto.parseFrom(reply.getMessage().getContent());
+//    RaftClientReply reply =
+//        
raftClient.send(Message.valueOf(LogServiceProtoUtil.toArchiveLogRequestProto(name)
+//            .toByteString()));
+//    ArchiveLogReplyProto parseFrom =
+//        ArchiveLogReplyProto.parseFrom(reply.getMessage().getContent());
   }
 
   @Override
   public void deleteLog(LogName name) throws IOException {
-    RaftClientReply reply =
-        
raftClient.send(Message.valueOf(LogServiceProtoUtil.toDeleteLogRequestProto(name)
-            .toByteString()));
-    DeleteLogReplyProto parseFrom = 
DeleteLogReplyProto.parseFrom(reply.getMessage().getContent());
+//    RaftClientReply reply =
+//        
raftClient.send(Message.valueOf(LogServiceProtoUtil.toDeleteLogRequestProto(name)
+//            .toByteString()));
+//    DeleteLogReplyProto parseFrom = 
DeleteLogReplyProto.parseFrom(reply.getMessage().getContent());
   }
 
 
@@ -137,11 +130,12 @@ public class LogServiceImpl implements LogService {
   @Override
   public LogStream createLog(LogName name, LogServiceConfiguration config) 
throws IOException {
     //TODO configuration
-    RaftClientReply reply =
-        
raftClient.send(Message.valueOf(LogServiceProtoUtil.toCreateLogRequestProto(name)
-            .toByteString()));
-    CreateLogReplyProto parseFrom = 
CreateLogReplyProto.parseFrom(reply.getMessage().getContent());
-    return LogServiceProtoUtil.toLogStream(parseFrom.getLogStream(), this);
+//    RaftClientReply reply =
+//        
raftClient.send(Message.valueOf(LogServiceProtoUtil.toCreateLogRequestProto(name)
+//            .toByteString()));
+//    CreateLogReplyProto parseFrom = 
CreateLogReplyProto.parseFrom(reply.getMessage().getContent());
+//    return LogServiceProtoUtil.toLogStream(parseFrom.getLogStream(), this);
+    return null;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/85d8e025/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogStreamImpl.java
----------------------------------------------------------------------
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogStreamImpl.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogStreamImpl.java
index 06a28f5..2c7feac 100644
--- 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogStreamImpl.java
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogStreamImpl.java
@@ -29,7 +29,7 @@ import org.apache.ratis.logservice.api.LogStream;
 import org.apache.ratis.logservice.api.LogServiceConfiguration;
 import org.apache.ratis.logservice.api.LogWriter;
 import org.apache.ratis.logservice.api.RecordListener;
-import org.apache.ratis.proto.logservice.LogServiceProtos.LogStreamProto;
+import org.apache.ratis.logservice.proto.LogServiceProtos;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -62,7 +62,7 @@ public class LogStreamImpl implements LogStream {
    */
   long length;
 
-  public LogStreamImpl(LogStreamProto proto, LogService service) {
+  public LogStreamImpl(LogServiceProtos.LogStreamProto proto, LogService 
service) {
     this.service = service;
     this.name = LogName.of(proto.getLogName().getName());
     this.config = service.getConfiguration();

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/85d8e025/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogWriterImpl.java
----------------------------------------------------------------------
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogWriterImpl.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogWriterImpl.java
index 92082ab..da19e70 100644
--- 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogWriterImpl.java
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogWriterImpl.java
@@ -21,15 +21,14 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.ExecutionException;
 
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.logservice.api.LogServiceConfiguration;
 import org.apache.ratis.logservice.api.LogStream;
 import org.apache.ratis.logservice.api.LogWriter;
 import org.apache.ratis.logservice.util.LogServiceProtoUtil;
-import 
org.apache.ratis.proto.logservice.LogServiceProtos.AppendLogEntryReplyProto;
-import org.apache.ratis.proto.logservice.LogServiceProtos.LogServiceException;
-import org.apache.ratis.proto.logservice.LogServiceProtos.SyncLogReplyProto;
+import org.apache.ratis.logservice.proto.LogServiceProtos.*;
 import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.RaftClientReply;
 import org.slf4j.Logger;
@@ -62,9 +61,16 @@ public class LogWriterImpl implements LogWriter {
     List<ByteBuffer> list = new ArrayList<ByteBuffer>();
     list.add(data);
     RaftClientReply reply =
-        raftClient.send(Message.valueOf(LogServiceProtoUtil
-            .toAppendBBEntryLogRequestProto(parent.getName(), list)
-            .toByteString()));
+            null;
+    try {
+      reply = raftClient.sendAsync(Message.valueOf(LogServiceProtoUtil
+          .toAppendBBEntryLogRequestProto(parent.getName(), list)
+          .toByteString())).get();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    } catch (ExecutionException e) {
+      e.printStackTrace();
+    }
     AppendLogEntryReplyProto proto = 
AppendLogEntryReplyProto.parseFrom(reply.getMessage().getContent());
     if (proto.hasException()) {
       LogServiceException e = proto.getException();

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/85d8e025/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/ManagementStateMachine.java
----------------------------------------------------------------------
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/ManagementStateMachine.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/ManagementStateMachine.java
new file mode 100644
index 0000000..2b018bb
--- /dev/null
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/ManagementStateMachine.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.logservice.server;
+
+import org.apache.ratis.statemachine.impl.BaseStateMachine;
+
+
+/**
+ * This is the statemachine for the default group serversGroupID. At the 
moment it's empty, but would
+ * have some logic related to the heartbeat functionality.
+ */
+public class ManagementStateMachine extends BaseStateMachine {
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/85d8e025/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MasterServer.java
----------------------------------------------------------------------
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MasterServer.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MasterServer.java
new file mode 100644
index 0000000..a0157c3
--- /dev/null
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MasterServer.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.logservice.server;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.logservice.common.Constants;
+import org.apache.ratis.logservice.util.LogServiceUtils;
+import org.apache.ratis.netty.NettyConfigKeys;
+import org.apache.ratis.protocol.*;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.util.FileUtils;
+import org.apache.ratis.util.LifeCycle;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.net.*;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.Set;
+
+import static org.apache.ratis.logservice.common.Constants.metaGroupID;
+import static 
org.apache.ratis.logservice.util.LogServiceUtils.getPeersFromQuorum;
+
+/**
+ * Master quorum is responsible for tracking all available quorum members
+ */
+public class MasterServer implements Closeable {
+
+
+    // RaftServer internal server. Has meta raft group and MetaStateMachine
+    private  RaftServer server;
+
+    private String id;
+
+    private String host;
+
+    @Parameter(names = "-port", description = "Port number")
+    private int port = 9999;
+
+    private String workingDir = null;
+
+    private StateMachine metaStateMachine;
+
+    private LifeCycle lifeCycle;
+
+    public MasterServer(String hostname, int port, String workingDir) {
+        this.port = port;
+        this.host = hostname;
+        this.workingDir = workingDir;
+        id = host + "_" + port;
+        this.lifeCycle = new LifeCycle(this.id);
+
+    }
+
+    public MasterServer() {
+
+    }
+
+    public void start(String metaGroupId) throws IOException  {
+        if (host == null) {
+            host = LogServiceUtils.getHostName();
+        }
+        this.lifeCycle = new LifeCycle(this.id);
+        RaftProperties properties = new RaftProperties();
+        if(workingDir != null) {
+            RaftServerConfigKeys.setStorageDirs(properties, 
Collections.singletonList(new File(workingDir)));
+        }
+        GrpcConfigKeys.Server.setPort(properties, port);
+        NettyConfigKeys.Server.setPort(properties, port);
+        Set<RaftPeer> peers = getPeersFromQuorum(metaGroupId);
+        RaftGroup metaGroup = RaftGroup.valueOf(Constants.metaGroupID, peers);
+        metaStateMachine = new MetaStateMachine();
+        server = RaftServer.newBuilder()
+                .setGroup(metaGroup)
+                .setServerId(RaftPeerId.valueOf(id))
+                .setStateMachineRegistry(raftGroupId -> {
+                    if(raftGroupId.equals(metaGroupID)) {
+                        return metaStateMachine;
+                    }
+                    return null;
+                })
+                .setProperties(properties).build();
+        lifeCycle.startAndTransition(() -> {
+            server.start();
+        }, IOException.class);
+    }
+
+    public static void main(String[] args) throws IOException {
+        MasterServer master = new MasterServer();
+        JCommander.newBuilder()
+                .addObject(master)
+                .build()
+                .parse(args);
+        master.start(null);
+
+
+    }
+    public static MasterServer.Builder newBuilder() {
+        return new MasterServer.Builder();
+    }
+
+    @Override
+    public void close() throws IOException {
+        server.close();
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    public String getAddress() {
+        return host + ":" + port;
+    }
+
+    public void cleanUp() throws IOException {
+        FileUtils.deleteFully(new File(workingDir));
+    }
+
+    public static class Builder {
+        private String host = null;
+        private int port = 9999;
+        private String workingDir = null;
+
+        /**
+         * @return a {@link MasterServer} object.
+         */
+        public MasterServer build()  {
+            if (host == null) {
+                host = LogServiceUtils.getHostName();
+            }
+            return new MasterServer(host, port, workingDir);
+        }
+
+        /**
+         * Set the server hostname.
+         */
+        public Builder setHost(String host) {
+            this.host = host;
+            return this;
+        }
+
+        /**
+         * Set server port
+         */
+        public Builder setPort(int port) {
+            this.port = port;
+            return this;
+        }
+
+        public Builder setWorkingDir(String workingDir) {
+            this.workingDir = workingDir;
+            return this;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/85d8e025/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java
----------------------------------------------------------------------
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java
new file mode 100644
index 0000000..ab614d9
--- /dev/null
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java
@@ -0,0 +1,371 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.logservice.server;
+
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.logservice.api.LogName;
+import org.apache.ratis.logservice.api.LogInfo;
+import org.apache.ratis.logservice.common.LogAlreadyExistException;
+import org.apache.ratis.logservice.common.LogNotFoundException;
+import org.apache.ratis.logservice.common.NoEnoughWorkersException;
+import org.apache.ratis.logservice.proto.MetaServiceProtos;
+import org.apache.ratis.logservice.proto.MetaServiceProtos.*;
+import org.apache.ratis.logservice.util.LogServiceProtoUtil;
+import org.apache.ratis.logservice.util.MetaServiceProtoUtil;
+import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.protocol.*;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.storage.RaftStorage;
+import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.statemachine.TransactionContext;
+import org.apache.ratis.statemachine.impl.BaseStateMachine;
+import 
org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.ratis.util.AutoCloseableLock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.StreamSupport;
+
+import static org.apache.ratis.logservice.common.Constants.metaGroupID;
+import static org.apache.ratis.logservice.common.Constants.serversGroupID;
+
+/**
+ * State Machine serving meta data for LogService. It persists the pairs 'log 
name' -> RaftGroup
+ * During the start basing on the persisted data it would be able to build a 
list of the existing servers.
+ * Requests from clients for DDL operations are handled by query mechanism (so 
only the leader accept that.
+ * It performs the operation (such as Log creation) and sends a message with 
the log -> group pair to itself
+ * to persis this data internally and on followers.
+ */
+
+public class MetaStateMachine extends BaseStateMachine {
+
+    Logger LOG = LoggerFactory.getLogger(MetaStateMachine.class);
+
+
+    //Persisted map between log and RaftGroup
+    private Map<LogName, RaftGroup> map = new ConcurrentHashMap<>();
+    // List of the currently known peers.
+    private final Set<RaftPeer> peers = new HashSet();
+
+    // keep a copy of raftServer to get group information.
+    private RaftServer raftServer;
+
+
+    private RaftGroup currentGroup = null;
+
+    // MinHeap queue for load balancing groups across the peers
+    private PriorityBlockingQueue<PeerGroups> avail = new 
PriorityBlockingQueue<PeerGroups>();
+
+    //Properties
+    private RaftProperties properties = new RaftProperties();
+
+    private final ReentrantReadWriteLock lock = new 
ReentrantReadWriteLock(true);
+
+
+    @Override
+    public void initialize(RaftServer server, RaftGroupId groupId, RaftStorage 
storage) throws IOException {
+        this.raftServer = server;
+        super.initialize(server, groupId, storage);
+    }
+
+    @Override
+    public TransactionContext applyTransactionSerial(TransactionContext trx) {
+        RaftProtos.LogEntryProto x = trx.getLogEntry();
+        MetaSMRequestProto req = null;
+        try {
+            req = 
MetaSMRequestProto.parseFrom(x.getStateMachineLogEntry().getLogData());
+        } catch (InvalidProtocolBufferException e) {
+            e.printStackTrace();
+        }
+        switch (req.getTypeCase()) {
+            case REGISTERREQUEST:
+                LogServiceRegisterLogRequestProto r = req.getRegisterRequest();
+                LogName logname = 
LogServiceProtoUtil.toLogName(r.getLogname());
+                RaftGroup rg = 
MetaServiceProtoUtil.toRaftGroup(r.getRaftGroup());
+                map.put(logname, rg);
+                LOG.info("Log {} registered at {} with group {} ", logname, 
getId(), rg );
+                break;
+            case UNREGISTERREQUEST:
+                LogServiceUnregisterLogRequestProto unregReq = 
req.getUnregisterRequest();
+                logname = LogServiceProtoUtil.toLogName(unregReq.getLogname());
+                map.remove(logname);
+                break;
+            case PINGREQUEST:
+                LogServicePingRequestProto pingRequest = req.getPingRequest();
+                RaftPeer peer = 
MetaServiceProtoUtil.toRaftPeer(pingRequest.getPeer());
+                if (peers.contains(peer)) {
+                    //Do Nothing, that's just heartbeat
+                } else {
+                    peers.add(peer);
+                    avail.add(new PeerGroups(peer));
+                }
+                break;
+
+            default:
+        }
+        return super.applyTransactionSerial(trx);
+    }
+
+    @Override
+    public TransactionContext startTransaction(RaftClientRequest request) 
throws IOException {
+        return super.startTransaction(request);
+    }
+
+    @Override
+    public TransactionContext preAppendTransaction(TransactionContext trx) 
throws IOException {
+        return super.preAppendTransaction(trx);
+    }
+
+    @Override
+    public CompletableFuture<Message> queryStale(Message request, long 
minIndex) {
+        return super.queryStale(request, minIndex);
+    }
+
+    @Override
+    public CompletableFuture<Message> applyTransaction(TransactionContext trx) 
{
+        return super.applyTransaction(trx);
+    }
+
+    @Override
+    public CompletableFuture<Message> query(Message request) {
+        if (currentGroup == null) {
+            try {
+                List<RaftGroup> x = 
StreamSupport.stream(raftServer.getGroups().spliterator(), false).filter(group 
-> group.getGroupId().equals(metaGroupID)).collect(Collectors.toList());
+                if (x.size() == 1) {
+                    currentGroup = x.get(0);
+                }
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+        }
+        RaftProperties properties = new RaftProperties();
+        MetaServiceProtos.MetaServiceRequestProto req = null;
+        try {
+            req =  
MetaServiceProtos.MetaServiceRequestProto.parseFrom(request.getContent());
+        } catch (InvalidProtocolBufferException e) {
+            e.printStackTrace();
+        }
+        MetaServiceProtos.MetaServiceRequestProto.TypeCase type = 
req.getTypeCase();
+        switch (type) {
+
+            case CREATELOG:
+                return processCreateLogRequest(req);
+            case LISTLOGS:
+                return processListLogsRequest();
+            case GETLOG:
+                return processGetLogRequest(req);
+            case ARCHIVELOG:
+                return processArchiveLog(req);
+            case DELETELOG:
+                return processDeleteLog(req);
+                default:
+        }
+        CompletableFuture<Message> reply = super.query(request);
+        return reply;
+    }
+
+
+
+    private CompletableFuture<Message>
+    processDeleteLog(MetaServiceProtos.MetaServiceRequestProto 
logServiceRequestProto) {
+        DeleteLogRequestProto deleteLog = 
logServiceRequestProto.getDeleteLog();
+        LogName logName = 
LogServiceProtoUtil.toLogName(deleteLog.getLogName());
+        RaftGroup raftGroup = map.get(logName);
+        if (raftGroup == null) {
+            return CompletableFuture.completedFuture(Message.valueOf(
+                    MetaServiceProtoUtil.toDeleteLogExceptionReplyProto(
+                            new 
LogNotFoundException(logName.getName())).build().toByteString()));
+        } else {
+            Collection<RaftPeer> peers = raftGroup.getPeers();
+            peers.stream().forEach(peer -> {
+                RaftClient client = RaftClient.newBuilder()
+                        .setProperties(properties)
+                        .setClientId(ClientId.randomId())
+                        .setRaftGroup(RaftGroup.valueOf(serversGroupID, peer))
+                        .build();
+                try {
+                    client.groupRemove(raftGroup.getGroupId(), true, 
peer.getId());
+                } catch (IOException e) {
+                    e.printStackTrace();
+                }
+            });
+            RaftClient client = RaftClient.newBuilder()
+                    .setRaftGroup(currentGroup)
+                    .setClientId(ClientId.randomId())
+                    .setProperties(properties)
+                    .build();
+            try {
+                client.send(() -> 
MetaServiceProtos.MetaSMRequestProto.newBuilder()
+                        .setUnregisterRequest(
+                                
LogServiceUnregisterLogRequestProto.newBuilder()
+                                        
.setLogname(LogServiceProtoUtil.toLogNameProto(logName)))
+                        .build().toByteString());
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+        }
+        return CompletableFuture.completedFuture(Message.valueOf(
+                MetaServiceProtoUtil.toDeleteLogReplyProto().toByteString()));
+    }
+
+//    private CompletableFuture<Message> 
processCloseLog(MetaServiceProtos.MetaServiceRequestProto 
logServiceRequestProto) {
+//        CloseLogRequestProto closeLog = logServiceRequestProto.getCloseLog();
+//        LogName logName = 
LogServiceProtoUtil.toLogName(closeLog.getLogName());
+//        // Need to check whether the file is opened if opened close it.
+//        // TODO need to handle exceptions while operating with files.
+//        return CompletableFuture.completedFuture(Message
+//                
.valueOf(CloseLogReplyProto.newBuilder().build().toByteString()));
+//    }
+
+    private CompletableFuture<Message>
+    processArchiveLog(MetaServiceProtos.MetaServiceRequestProto 
logServiceRequestProto) {
+        ArchiveLogRequestProto archiveLog = 
logServiceRequestProto.getArchiveLog();
+        LogName logName = 
LogServiceProtoUtil.toLogName(archiveLog.getLogName());
+        // Handle log archiving.
+        return CompletableFuture.completedFuture(Message
+                
.valueOf(ArchiveLogReplyProto.newBuilder().build().toByteString()));
+    }
+
+//    private CompletableFuture<Message> processGetStateRequest(
+//            MetaServiceProtos.MetaServiceRequestProto 
logServiceRequestProto) {
+//        MetaServiceProtos.GetStateRequestProto getState = 
logServiceRequestProto.getGetState();
+//        LogName logName = 
LogServiceProtoUtil.toLogName(getState.getLogName());
+//        return 
CompletableFuture.completedFuture(Message.valueOf(LogServiceProtoUtil
+//                .toGetStateReplyProto(true).toByteString()));
+//    }
+//
+    private CompletableFuture<Message> processCreateLogRequest(
+            MetaServiceProtos.MetaServiceRequestProto logServiceRequestProto) {
+        LogName name;
+        try (final AutoCloseableLock writeLock = writeLock()) {
+            CreateLogRequestProto createLog = 
logServiceRequestProto.getCreateLog();
+            name = LogServiceProtoUtil.toLogName(createLog.getLogName());
+            if(map.containsKey(name)) {
+                return 
CompletableFuture.completedFuture(Message.valueOf(MetaServiceProtoUtil
+                        .toCreateLogExceptionReplyProto(new 
LogAlreadyExistException(name.getName()))
+                        .build()
+                        .toByteString()));
+            }
+            // Check that we have at least 3 nodes
+            if (avail.size() < 3) {
+                return 
CompletableFuture.completedFuture(Message.valueOf(MetaServiceProtoUtil
+                .toCreateLogExceptionReplyProto(new 
NoEnoughWorkersException(avail.size()))
+                        .build()
+                        .toByteString()));
+            } else {
+                List<PeerGroups> peerGroup = IntStream.range(0, 3).mapToObj(i 
-> avail.poll()).collect(Collectors.toList());
+                List<RaftPeer> peers = peerGroup.stream().map(obj -> 
obj.getPeer()).collect(Collectors.toList());
+                RaftGroup raftGroup = 
RaftGroup.valueOf(RaftGroupId.randomId(), peers);
+                peerGroup.stream().forEach(pg -> {
+                    pg.getGroups().add(raftGroup);
+                    avail.add(pg);
+                });
+                peers.forEach(i -> {
+                    RaftClient client = 
RaftClient.newBuilder().setProperties(properties).setRaftGroup(RaftGroup.valueOf(serversGroupID,
 i)).build();
+                    try {
+                        client.groupAdd(raftGroup, i.getId());
+                    } catch (IOException e) {
+                        e.printStackTrace();
+                    }
+                });
+                RaftClient client = RaftClient.newBuilder()
+                        .setRaftGroup(currentGroup)
+                        .setClientId(ClientId.randomId())
+                        .setProperties(properties)
+                        .build();
+                try {
+                    client.send(() -> 
MetaServiceProtos.MetaSMRequestProto.newBuilder()
+                            
.setRegisterRequest(LogServiceRegisterLogRequestProto
+                                    .newBuilder()
+                                    
.setLogname(LogServiceProtoUtil.toLogNameProto(name))
+                                    .setRaftGroup(MetaServiceProtoUtil
+                                            .toRaftGroupProto(raftGroup)))
+                            .build().toByteString());
+                } catch (IOException e) {
+                    e.printStackTrace();
+                }
+                return 
CompletableFuture.completedFuture(Message.valueOf(MetaServiceProtoUtil
+                        .toCreateLogReplyProto(new LogInfo((name), 
raftGroup)).build().toByteString()));
+            }
+        }
+    }
+
+
+    private AutoCloseableLock writeLock() {
+        return AutoCloseableLock.acquire(lock.writeLock());
+    }
+
+    private CompletableFuture<Message> processListLogsRequest() {
+        return 
CompletableFuture.completedFuture(Message.valueOf(MetaServiceProtoUtil
+                .toListLogLogsReplyProto(
+                        map.entrySet()
+                                .stream()
+                                .map(log -> new LogInfo(log.getKey(), 
log.getValue()))
+                                
.collect(Collectors.toList())).toByteString()));
+    }
+
+    private CompletableFuture<Message> processGetLogRequest(
+            MetaServiceProtos.MetaServiceRequestProto logServiceRequestProto) {
+        MetaServiceProtos.GetLogRequestProto getLog = 
logServiceRequestProto.getGetLog();
+        LogName logName = LogServiceProtoUtil.toLogName(getLog.getLogName());
+        RaftGroup raftGroup = map.get(logName);
+        if (raftGroup != null) {
+            return CompletableFuture.completedFuture(Message.valueOf(
+                    MetaServiceProtoUtil.toGetLogReplyProto(new 
LogInfo(logName, raftGroup))
+                            .toByteString()));
+        } else {
+            return CompletableFuture.completedFuture(Message.valueOf(
+                    MetaServiceProtoUtil.toGetLogExceptionReplyProto(
+                            new 
LogNotFoundException(logName.getName())).build().toByteString()));
+        }
+    }
+
+
+    class PeerGroups implements Comparable{
+        RaftPeer peer;
+        Set<RaftGroup> groups = new HashSet<>();
+
+        public PeerGroups(RaftPeer peer) {
+            this.peer = peer;
+
+        }
+
+        public Set<RaftGroup> getGroups () {
+            return groups;
+        }
+
+        public RaftPeer getPeer() {
+            return peer;
+        }
+
+        @Override
+        public int compareTo(Object o) {
+            return groups.size() - ((PeerGroups) o).groups.size();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/85d8e025/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceProtoUtil.java
----------------------------------------------------------------------
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceProtoUtil.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceProtoUtil.java
index 2e1b8da..59037b8 100644
--- 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceProtoUtil.java
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceProtoUtil.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.ratis.logservice.util;
 
 import java.nio.ByteBuffer;
@@ -27,131 +28,32 @@ import org.apache.ratis.logservice.api.LogService;
 import org.apache.ratis.logservice.api.LogStream;
 import org.apache.ratis.logservice.api.LogStream.State;
 import org.apache.ratis.logservice.impl.LogStreamImpl;
-import 
org.apache.ratis.proto.logservice.LogServiceProtos.AppendLogEntryReplyProto;
-import 
org.apache.ratis.proto.logservice.LogServiceProtos.AppendLogEntryRequestProto;
-import org.apache.ratis.proto.logservice.LogServiceProtos.ArchiveLogReplyProto;
-import 
org.apache.ratis.proto.logservice.LogServiceProtos.ArchiveLogRequestProto;
-import org.apache.ratis.proto.logservice.LogServiceProtos.CloseLogReplyProto;
-import org.apache.ratis.proto.logservice.LogServiceProtos.CloseLogRequestProto;
-import org.apache.ratis.proto.logservice.LogServiceProtos.CreateLogReplyProto;
-import 
org.apache.ratis.proto.logservice.LogServiceProtos.CreateLogRequestProto;
-import org.apache.ratis.proto.logservice.LogServiceProtos.DeleteLogReplyProto;
-import 
org.apache.ratis.proto.logservice.LogServiceProtos.DeleteLogRequestProto;
-import 
org.apache.ratis.proto.logservice.LogServiceProtos.GetLogLengthReplyProto;
-import 
org.apache.ratis.proto.logservice.LogServiceProtos.GetLogLengthRequestProto;
-import org.apache.ratis.proto.logservice.LogServiceProtos.GetLogReplyProto;
-import org.apache.ratis.proto.logservice.LogServiceProtos.GetLogRequestProto;
-import 
org.apache.ratis.proto.logservice.LogServiceProtos.GetLogStartIndexReplyProto;
-import 
org.apache.ratis.proto.logservice.LogServiceProtos.GetLogStartIndexRequestProto;
-import org.apache.ratis.proto.logservice.LogServiceProtos.GetStateReplyProto;
-import org.apache.ratis.proto.logservice.LogServiceProtos.GetStateRequestProto;
-import org.apache.ratis.proto.logservice.LogServiceProtos.ListLogsReplyProto;
-import 
org.apache.ratis.proto.logservice.LogServiceProtos.ListLogsReplyProto.Builder;
-import org.apache.ratis.proto.logservice.LogServiceProtos.ListLogsRequestProto;
-import org.apache.ratis.proto.logservice.LogServiceProtos.LogNameProto;
-import org.apache.ratis.proto.logservice.LogServiceProtos.LogServiceException;
-import 
org.apache.ratis.proto.logservice.LogServiceProtos.LogServiceRequestProto;
-import org.apache.ratis.proto.logservice.LogServiceProtos.LogStreamProto;
-import org.apache.ratis.proto.logservice.LogServiceProtos.LogStreamState;
-import org.apache.ratis.proto.logservice.LogServiceProtos.ReadLogReplyProto;
-import org.apache.ratis.proto.logservice.LogServiceProtos.ReadLogRequestProto;
-import org.apache.ratis.proto.logservice.LogServiceProtos.SyncLogReplyProto;
-import org.apache.ratis.proto.logservice.LogServiceProtos.SyncLogRequestProto;
+import org.apache.ratis.logservice.proto.LogServiceProtos;
+import org.apache.ratis.logservice.proto.LogServiceProtos.*;
+import org.apache.ratis.logservice.proto.MetaServiceProtos.*;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
 public class LogServiceProtoUtil {
-  public static LogServiceRequestProto toCreateLogRequestProto(LogName 
logName) {
-    LogNameProto logNameProto = 
LogNameProto.newBuilder().setName(logName.getName()).build();
-    CreateLogRequestProto createLog =
-        CreateLogRequestProto.newBuilder().setLogName(logNameProto).build();
-    return LogServiceRequestProto.newBuilder().setCreateLog(createLog).build();
-  }
-
-  public static LogServiceRequestProto toListLogRequestProto() {
-    ListLogsRequestProto listLogs = ListLogsRequestProto.newBuilder().build();
-    return LogServiceRequestProto.newBuilder().setListLogs(listLogs).build();
-  }
-
-  public static LogServiceRequestProto toGetLogRequestProto(LogName name) {
-    GetLogRequestProto getLog =
-        
GetLogRequestProto.newBuilder().setLogName(toLogNameProto(name)).build();
-    return LogServiceRequestProto.newBuilder().setGetLog(getLog).build();
-  }
-
-  public static LogServiceRequestProto toCloseLogRequestProto(LogName logName) 
{
-    LogNameProto logNameProto = 
LogNameProto.newBuilder().setName(logName.getName()).build();
-    CloseLogRequestProto closeLog =
-        CloseLogRequestProto.newBuilder().setLogName(logNameProto).build();
-    return LogServiceRequestProto.newBuilder().setCloseLog(closeLog).build();
-  }
-
-  public static CloseLogReplyProto toCloseLogReplyProto() {
-    CloseLogReplyProto.Builder builder = CloseLogReplyProto.newBuilder();
-    return builder.build();
-  }
-
-  public static LogServiceRequestProto toGetStateRequestProto(LogName logName) 
{
-    LogNameProto logNameProto = 
LogNameProto.newBuilder().setName(logName.getName()).build();
-    GetStateRequestProto getState =
-        GetStateRequestProto.newBuilder().setLogName(logNameProto).build();
-    return LogServiceRequestProto.newBuilder().setGetState(getState).build();
-  }
-
-  public static LogServiceRequestProto toArchiveLogRequestProto(LogName 
logName) {
-    LogNameProto logNameProto = 
LogNameProto.newBuilder().setName(logName.getName()).build();
-    ArchiveLogRequestProto archiveLog =
-        ArchiveLogRequestProto.newBuilder().setLogName(logNameProto).build();
-    return 
LogServiceRequestProto.newBuilder().setArchiveLog(archiveLog).build();
-  }
-
-  public static LogServiceRequestProto toDeleteLogRequestProto(LogName 
logName) {
-    LogNameProto logNameProto = 
LogNameProto.newBuilder().setName(logName.getName()).build();
-    DeleteLogRequestProto deleteLog =
-        DeleteLogRequestProto.newBuilder().setLogName(logNameProto).build();
-    return LogServiceRequestProto.newBuilder().setDeleteLog(deleteLog).build();
-  }
-
-  public static DeleteLogReplyProto toDeleteLogReplyProto() {
-    DeleteLogReplyProto.Builder builder = DeleteLogReplyProto.newBuilder();
-    return builder.build();
-  }
 
   public static LogNameProto toLogNameProto(LogName logName) {
     return LogNameProto.newBuilder().setName(logName.getName()).build();
   }
 
-  public static LogName toLogName(LogNameProto logNameProto) {
+  public static LogName toLogName(LogServiceProtos.LogNameProto logNameProto) {
     return LogName.of(logNameProto.getName());
   }
 
   public static LogStreamProto toLogStreamProto(LogStream logStream) {
     LogNameProto logNameProto =
-        
LogNameProto.newBuilder().setName(logStream.getName().getName()).build();
+            
LogNameProto.newBuilder().setName(logStream.getName().getName()).build();
     LogStreamProto logStreamProto =
-        LogStreamProto
-            .newBuilder()
-            .setLogName(logNameProto)
-            .setSize(logStream.getSize())
-            .setState(
-              logStream.getState().equals(State.OPEN) ? LogStreamState.OPEN : 
LogStreamState.CLOSED)
-            .build();
+            LogStreamProto
+                    .newBuilder()
+                    .setLogName(logNameProto)
+                    .setSize(logStream.getSize())
+                    .setState(
+                            logStream.getState().equals(State.OPEN) ? 
LogStreamState.OPEN : LogStreamState.CLOSED)
+                    .build();
     return logStreamProto;
   }
 
@@ -159,26 +61,19 @@ public class LogServiceProtoUtil {
     return new LogStreamImpl(logStream, parent);
   }
 
-  public static CreateLogReplyProto toCreateLogReplyProto(LogStream logStream) 
{
-    LogNameProto logNameProto =
-        
LogNameProto.newBuilder().setName(logStream.getName().getName()).build();
-    LogStreamProto logStreamProto =
-        LogStreamProto
-            .newBuilder()
-            .setLogName(logNameProto)
-            .setSize(logStream.getSize())
-            .setState(
-              logStream.getState().equals(State.OPEN) ? LogStreamState.OPEN : 
LogStreamState.CLOSED)
-            .build();
-    return 
CreateLogReplyProto.newBuilder().setLogStream(logStreamProto).build();
+
+  public static LogServiceRequestProto toCloseLogRequestProto(LogName logName) 
{
+    LogNameProto logNameProto = 
LogNameProto.newBuilder().setName(logName.getName()).build();
+    CloseLogRequestProto closeLog =
+        CloseLogRequestProto.newBuilder().setLogName(logNameProto).build();
+    return LogServiceRequestProto.newBuilder().setCloseLog(closeLog).build();
   }
 
-  public static ListLogsReplyProto toListLogLogsReplyProto(List<LogStream> 
logStreams) {
-    Builder newBuilder = ListLogsReplyProto.newBuilder();
-    for (LogStream stream : logStreams) {
-      newBuilder.addLogStrems(toLogStreamProto(stream));
-    }
-    return newBuilder.build();
+  public static LogServiceRequestProto toGetStateRequestProto(LogName logName) 
{
+    LogNameProto logNameProto = 
LogNameProto.newBuilder().setName(logName.getName()).build();
+    GetStateRequestProto getState =
+        GetStateRequestProto.newBuilder().setLogName(logNameProto).build();
+    return LogServiceRequestProto.newBuilder().setGetState(getState).build();
   }
 
   public static ArchiveLogReplyProto toArchiveLogReplyProto() {
@@ -261,13 +156,8 @@ public class LogServiceProtoUtil {
     return retVal;
   }
 
-  public static GetLogReplyProto toGetLogReplyProto(LogStream logStream) {
-    return 
GetLogReplyProto.newBuilder().setLogStream(toLogStreamProto(logStream)).build();
-  }
-
   public static GetStateReplyProto toGetStateReplyProto(boolean exists) {
-    return GetStateReplyProto.newBuilder()
-        .setState(exists ? LogStreamState.OPEN : 
LogStreamState.CLOSED).build();
+    return GetStateReplyProto.newBuilder().build();
   }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/85d8e025/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceUtils.java
----------------------------------------------------------------------
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceUtils.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceUtils.java
new file mode 100644
index 0000000..a25a7df
--- /dev/null
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceUtils.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.logservice.util;
+
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+
+import java.net.DatagramSocket;
+import java.net.InetAddress;
+import java.net.SocketException;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class LogServiceUtils {
+
+    public static Set<RaftPeer> getPeersFromIds(String identity) {
+        return Stream.of(identity.split(",")).map(elem ->
+                new RaftPeer(RaftPeerId.valueOf(elem), elem.replace('_', ':'))
+        ).collect(Collectors.toSet());
+    }
+
+    public static Set<RaftPeer> getPeersFromQuorum(String identity) {
+        return Stream.of(identity.split(",")).map(elem ->
+                new RaftPeer(RaftPeerId.valueOf(elem.replace(':', '_')), elem)
+        ).collect(Collectors.toSet());
+    }
+
+    public static String getHostName() {
+        try (DatagramSocket socket = new DatagramSocket()) {
+            socket.connect(InetAddress.getByName("8.8.8.8"), 10002);
+            return socket.getLocalAddress().getHostName();
+        } catch (Exception e) {
+            return "localhost";
+        }
+
+    }
+}

Reply via email to