Repository: incubator-ratis Updated Branches: refs/heads/master 737db6543 -> 59de6bd6b
RATIS-279 Create administrative API for LogService Signed-off-by: Josh Elser <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/59de6bd6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/59de6bd6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/59de6bd6 Branch: refs/heads/master Commit: 59de6bd6ba92fa24c7efdce0feffb814693c268e Parents: 737db65 Author: Rajeshbabu Chintaguntla <[email protected]> Authored: Tue Oct 9 23:51:12 2018 -0400 Committer: Josh Elser <[email protected]> Committed: Tue Oct 9 23:51:12 2018 -0400 ---------------------------------------------------------------------- .../ratis/logservice/LogServiceFactory.java | 4 +- .../apache/ratis/logservice/api/LogName.java | 12 +- .../apache/ratis/logservice/api/LogService.java | 16 +- .../ratis/logservice/api/LogStateMachine.java | 161 ++++++++++++++--- .../ratis/logservice/impl/BaseLogStream.java | 85 +++++++++ .../ratis/logservice/impl/LogServiceImpl.java | 145 +++++++++++++++ .../logservice/util/LogServiceProtoUtil.java | 179 +++++++++++++++++++ .../ratis/logservice/LogServiceBaseTest.java | 89 +++++++++ .../logservice/TestLogServiceWithGrpc.java | 24 +++ .../logservice/TestLogServiceWithNetty.java | 25 +++ ratis-proto/src/main/proto/Logservice.proto | 80 +++++++++ 11 files changed, 787 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/59de6bd6/ratis-logservice/src/main/java/org/apache/ratis/logservice/LogServiceFactory.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/LogServiceFactory.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/LogServiceFactory.java index 4ef801e..084188e 100644 --- a/ratis-logservice/src/main/java/org/apache/ratis/logservice/LogServiceFactory.java +++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/LogServiceFactory.java @@ -19,6 +19,7 @@ package org.apache.ratis.logservice; import org.apache.ratis.client.RaftClient; import org.apache.ratis.logservice.api.LogService; +import org.apache.ratis.logservice.impl.LogServiceImpl; public class LogServiceFactory { private static final LogServiceFactory INSTANCE = new LogServiceFactory(); @@ -31,8 +32,7 @@ public class LogServiceFactory { * @param raftClient The client to a Raft quorum. */ public LogService createLogService(RaftClient raftClient) { - //TODO return new LogServiceImpl(); - return null; + return new LogServiceImpl(raftClient); } /** http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/59de6bd6/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogName.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogName.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogName.java index bbeb0a5..3405340 100644 --- a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogName.java +++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogName.java @@ -21,6 +21,10 @@ import static java.util.Objects.requireNonNull; import java.util.Objects; +import org.apache.ratis.proto.logservice.LogServiceProtos.LogNameProto; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; +import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException; + /** * Identifier to uniquely identify a {@link LogStream}. */ @@ -42,7 +46,7 @@ public class LogName { * of identifying one LogStream/LogName from another. Users need only know how to construct a {@link LogName} * and then use that in their application. */ - String getName() { + public String getName() { return name; } @@ -76,4 +80,10 @@ public class LogName { // TODO Limit allowed characters in the name? return new LogName(name); } + + public static LogName parseFrom(ByteString logName) + throws InvalidProtocolBufferException { + LogNameProto logNameProto = LogNameProto.parseFrom(logName); + return new LogName(logNameProto.getName()); + } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/59de6bd6/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogService.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogService.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogService.java index 860cb33..c78ed2a 100644 --- a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogService.java +++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogService.java @@ -37,7 +37,7 @@ public interface LogService extends AutoCloseable { * * @param name Unique name for this LogStream. */ - LogStream createLog(LogName name); + LogStream createLog(LogName name) throws IOException; /** * Creates a new {@link LogStream} identified by the given name. Throws @@ -46,7 +46,7 @@ public interface LogService extends AutoCloseable { * @param name Unique name for this LogStream. * @param config Configuration object for this LogStream */ - LogStream createLog(LogName name, LogStreamConfiguration config); + LogStream createLog(LogName name, LogStreamConfiguration config) throws IOException; /* * How to get LogStreams that already exist @@ -56,12 +56,12 @@ public interface LogService extends AutoCloseable { * * @param name The name of the LogStream */ - LogStream getLog(LogName name); + LogStream getLog(LogName name) throws IOException; /** * Lists all {@link LogStream} instances known by this LogService. */ - Iterator<LogStream> listLogs(); + Iterator<LogStream> listLogs() throws IOException; /* * How to close, archive, and delete LogStreams @@ -74,14 +74,14 @@ public interface LogService extends AutoCloseable { * @param name The name of the log to close */ // TODO this name sucks, confusion WRT the Java Closeable interface. - void closeLog(LogName name); + void closeLog(LogName name) throws IOException; /** * Returns the current {@link State} of the log identified by {@code name}. * * @param name The name of a log */ - State getState(LogName name); + State getState(LogName name) throws IOException; /** * Archives the given log out of the state machine and into a configurable long-term storage. A log must be @@ -89,13 +89,13 @@ public interface LogService extends AutoCloseable { * * @param name The name of the log to archive. */ - void archiveLog(LogName name); + void archiveLog(LogName name) throws IOException; /** * Deletes the {@link LogStream}. * @param name The name of the LogStream */ - void deleteLog(LogName name); + void deleteLog(LogName name) throws IOException; /* * Change the configuration of a LogStream or manipulate a LogStream's listeners http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/59de6bd6/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStateMachine.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStateMachine.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStateMachine.java index cdc17f3..84689be 100644 --- a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStateMachine.java +++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStateMachine.java @@ -17,6 +17,9 @@ */ package org.apache.ratis.logservice.api; +import org.apache.ratis.logservice.api.LogStream.State; +import org.apache.ratis.logservice.impl.BaseLogStream; +import org.apache.ratis.logservice.util.LogServiceProtoUtil; import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.server.RaftServer; @@ -25,18 +28,31 @@ import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.storage.RaftStorage; import org.apache.ratis.proto.RaftProtos; import org.apache.ratis.proto.RaftProtos.LogEntryProto; +import org.apache.ratis.proto.logservice.LogServiceProtos.ArchiveLogReplyProto; +import org.apache.ratis.proto.logservice.LogServiceProtos.ArchiveLogRequestProto; +import org.apache.ratis.proto.logservice.LogServiceProtos.CloseLogReplyProto; +import org.apache.ratis.proto.logservice.LogServiceProtos.CloseLogRequestProto; +import org.apache.ratis.proto.logservice.LogServiceProtos.CreateLogRequestProto; +import org.apache.ratis.proto.logservice.LogServiceProtos.DeleteLogReplyProto; +import org.apache.ratis.proto.logservice.LogServiceProtos.DeleteLogRequestProto; +import org.apache.ratis.proto.logservice.LogServiceProtos.GetLogReplyProto; +import org.apache.ratis.proto.logservice.LogServiceProtos.GetLogRequestProto; +import org.apache.ratis.proto.logservice.LogServiceProtos.GetStateRequestProto; +import org.apache.ratis.proto.logservice.LogServiceProtos.LogServiceRequestProto; import org.apache.ratis.statemachine.StateMachineStorage; import org.apache.ratis.statemachine.TransactionContext; import org.apache.ratis.statemachine.impl.BaseStateMachine; import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage; import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo; import org.apache.ratis.util.AutoCloseableLock; - import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException; import java.io.*; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -166,32 +182,133 @@ public class LogStateMachine extends BaseStateMachine { public CompletableFuture<Message> applyTransaction(TransactionContext trx) { try { final LogEntryProto entry = trx.getLogEntry(); - final LogMessage logMessage = LogMessage.parseFrom((entry.getSmLogEntry().getData())); - - final long index = entry.getIndex(); - Long val = null; - LogName name = null; - try (final AutoCloseableLock writeLock = writeLock()) { - name = logMessage.getLogName(); - long dataLength = logMessage.getData().length; - val = state.get(name); - if (val == null) { - val = new Long(0); + LogServiceRequestProto logServiceRequestProto = + LogServiceRequestProto.parseFrom(entry.getSmLogEntry().getData()); + CompletableFuture<Message> f = null; + switch (logServiceRequestProto.getRequestCase()) { + case LOGMESSAGE: + org.apache.ratis.proto.logservice.LogServiceProtos.LogMessage logMessage2 = logServiceRequestProto.getLogMessage(); + final LogMessage logMessage = LogMessage.parseFrom((entry.getSmLogEntry().getData())); + + final long index = entry.getIndex(); + Long val = null; + LogName name = null; + try (final AutoCloseableLock writeLock = writeLock()) { + name = logMessage.getLogName(); + long dataLength = logMessage.getData().length; + val = state.get(name); + if (val == null) { + val = new Long(0); + } + state.put(name, val + dataLength); + updateLastAppliedTermIndex(entry.getTerm(), index); } - state.put(name, val + dataLength); - updateLastAppliedTermIndex(entry.getTerm(), index); - } - final CompletableFuture<Message> f = - CompletableFuture.completedFuture(new LogMessage(name, val)); - final RaftProtos.RaftPeerRole role = trx.getServerRole(); - LOG.debug("{}:{}-{}: {} new length {}", role, getId(), index, logMessage, val); - if (LOG.isTraceEnabled()) { - LOG.trace("{}-{}: variables={}", getId(), index, state); + f = + CompletableFuture.completedFuture(new LogMessage(name, val)); + final RaftProtos.RaftPeerRole role = trx.getServerRole(); + LOG.debug("{}:{}-{}: {} new length {}", role, getId(), index, logMessage, val); + if (LOG.isTraceEnabled()) { + LOG.trace("{}-{}: variables={}", getId(), index, state); + } + return f; + case CREATELOG: + return processCreateLogRequest(logServiceRequestProto); + case LISTLOGS: + return processListLogsRequest(); + case GETLOG: + return processGetLogRequest(logServiceRequestProto); + case GETSTATE: + return processGetStateRequest(logServiceRequestProto); + case ARCHIVELOG: + return processArchiveLog(logServiceRequestProto); + case CLOSELOG: + return processCloseLog(logServiceRequestProto); + case DELETELOG: + return processDeleteLog(logServiceRequestProto); + default: + return null; } - return f; } catch (InvalidProtocolBufferException e) { // TODO exception handling throw new RuntimeException(e); } } + + private CompletableFuture<Message> + processDeleteLog(LogServiceRequestProto logServiceRequestProto) { + DeleteLogRequestProto deleteLog = logServiceRequestProto.getDeleteLog(); + LogName logName = LogServiceProtoUtil.toLogName(deleteLog.getLogName()); + try (final AutoCloseableLock writeLock = writeLock()) { + state.remove(logName); + } + // TODO need to handle exceptions while operating with files. + return CompletableFuture.completedFuture(Message + .valueOf(DeleteLogReplyProto.newBuilder().build().toByteString())); + } + + private CompletableFuture<Message> processCloseLog(LogServiceRequestProto logServiceRequestProto) { + CloseLogRequestProto closeLog = logServiceRequestProto.getCloseLog(); + LogName logName = LogServiceProtoUtil.toLogName(closeLog.getLogName()); + // Need to check whether the file is opened if opened close it. + // TODO need to handle exceptions while operating with files. + return CompletableFuture.completedFuture(Message + .valueOf(CloseLogReplyProto.newBuilder().build().toByteString())); + } + + private CompletableFuture<Message> + processArchiveLog(LogServiceRequestProto logServiceRequestProto) { + ArchiveLogRequestProto archiveLog = logServiceRequestProto.getArchiveLog(); + LogName logName = LogServiceProtoUtil.toLogName(archiveLog.getLogName()); + // Handle log archiving. + return CompletableFuture.completedFuture(Message + .valueOf(ArchiveLogReplyProto.newBuilder().build().toByteString())); + } + + private CompletableFuture<Message> processGetStateRequest( + LogServiceRequestProto logServiceRequestProto) { + GetStateRequestProto getState = logServiceRequestProto.getGetState(); + LogName logName = LogServiceProtoUtil.toLogName(getState.getLogName()); + return CompletableFuture.completedFuture(Message.valueOf(LogServiceProtoUtil + .toGetStateReplyProto(state.containsKey(logName)).toByteString())); + } + + private CompletableFuture<Message> processCreateLogRequest( + LogServiceRequestProto logServiceRequestProto) { + Long val; + LogName name; + try (final AutoCloseableLock writeLock = writeLock()) { + CreateLogRequestProto createLog = logServiceRequestProto.getCreateLog(); + name = LogServiceProtoUtil.toLogName(createLog.getLogName()); + val = state.get(name); + if (val == null) { + val = new Long(0); + } + state.put(name, val); + } + return CompletableFuture.completedFuture(Message.valueOf(LogServiceProtoUtil + .toCreateLogReplyProto(new BaseLogStream(name, State.OPEN, val)).toByteString())); + } + + private CompletableFuture<Message> processListLogsRequest() { + List<LogStream> logStreams = new ArrayList<LogStream>(state.size()); + for (Entry<LogName, Long> e : state.entrySet()) { + logStreams.add(new BaseLogStream(e.getKey(), State.OPEN, e.getValue())); + } + return CompletableFuture.completedFuture(Message.valueOf(LogServiceProtoUtil + .toListLogLogsReplyProto(logStreams).toByteString())); + } + + private CompletableFuture<Message> processGetLogRequest( + LogServiceRequestProto logServiceRequestProto) { + GetLogRequestProto getLog = logServiceRequestProto.getGetLog(); + LogName logName = LogServiceProtoUtil.toLogName(getLog.getLogName()); + if (state.containsKey(logName)) { + return CompletableFuture.completedFuture(Message.valueOf(LogServiceProtoUtil + .toGetLogReplyProto(new BaseLogStream(logName, State.OPEN, state.get(logName))) + .toByteString())); + } else { + return CompletableFuture.completedFuture(Message.valueOf(GetLogReplyProto.newBuilder() + .build().toByteString())); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/59de6bd6/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/BaseLogStream.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/BaseLogStream.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/BaseLogStream.java new file mode 100644 index 0000000..4fe830e --- /dev/null +++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/BaseLogStream.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.logservice.impl; + +import java.util.Set; + +import org.apache.ratis.logservice.api.LogName; +import org.apache.ratis.logservice.api.LogReader; +import org.apache.ratis.logservice.api.LogStream; +import org.apache.ratis.logservice.api.LogStreamConfiguration; +import org.apache.ratis.logservice.api.LogWriter; +import org.apache.ratis.logservice.api.RecordListener; + +public class BaseLogStream implements LogStream { + private LogName logName; + private State state; + private long size; + + public BaseLogStream(LogName logName, State state, long size) { + this.logName = logName; + this.state = state; + this.size = size; + } + + @Override + public LogName getName() { + return logName; + } + + @Override + public State getState() { + return state; + } + + @Override + public long getSize() { + return size; + } + + @Override + public LogReader createReader() { + // TODO Auto-generated method stub + return null; + } + + @Override + public LogWriter createWriter() { + // TODO Auto-generated method stub + return null; + } + + @Override + public long getLastRecordId() { + // TODO Auto-generated method stub + return 0; + } + + @Override + public Set<RecordListener> getRecordListeners() { + // TODO Auto-generated method stub + return null; + } + + @Override + public LogStreamConfiguration getConfiguration() { + // TODO Auto-generated method stub + return null; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/59de6bd6/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogServiceImpl.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogServiceImpl.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogServiceImpl.java new file mode 100644 index 0000000..407e498 --- /dev/null +++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogServiceImpl.java @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.logservice.impl; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; + +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.client.impl.ClientProtoUtils; +import org.apache.ratis.logservice.api.LogName; +import org.apache.ratis.logservice.api.LogService; +import org.apache.ratis.logservice.api.LogStream; +import org.apache.ratis.logservice.api.LogStream.State; +import org.apache.ratis.logservice.api.LogStreamConfiguration; +import org.apache.ratis.logservice.api.RecordListener; +import org.apache.ratis.logservice.util.LogServiceProtoUtil; +import org.apache.ratis.proto.logservice.LogServiceProtos.ArchiveLogReplyProto; +import org.apache.ratis.proto.logservice.LogServiceProtos.CloseLogReplyProto; +import org.apache.ratis.proto.logservice.LogServiceProtos.CreateLogReplyProto; +import org.apache.ratis.proto.logservice.LogServiceProtos.DeleteLogReplyProto; +import org.apache.ratis.proto.logservice.LogServiceProtos.GetLogReplyProto; +import org.apache.ratis.proto.logservice.LogServiceProtos.GetStateReplyProto; +import org.apache.ratis.proto.logservice.LogServiceProtos.ListLogsReplyProto; +import org.apache.ratis.proto.logservice.LogServiceProtos.LogStreamProto; +import org.apache.ratis.proto.logservice.LogServiceProtos.LogStreamState; +import org.apache.ratis.protocol.Message; +import org.apache.ratis.protocol.RaftClientReply; + +public class LogServiceImpl implements LogService { + + final private RaftClient raftClient; + + public LogServiceImpl(RaftClient raftClient) { + this.raftClient = raftClient; + } + + @Override + public LogStream createLog(LogName name) throws IOException { + RaftClientReply reply = + raftClient.send(Message.valueOf(LogServiceProtoUtil.toCreateLogRequestProto(name) + .toByteString())); + CreateLogReplyProto parseFrom = CreateLogReplyProto.parseFrom(reply.getMessage().getContent()); + return LogServiceProtoUtil.toLogStream(parseFrom.getLogStream()); + } + + @Override + public LogStream createLog(LogName name, LogStreamConfiguration config) throws IOException { + // TODO need to make changes in the create log request to pass config. + return createLog(name); + } + + @Override + public LogStream getLog(LogName name) throws IOException { + RaftClientReply reply = + raftClient.send(Message.valueOf(LogServiceProtoUtil.toGetLogRequestProto(name) + .toByteString())); + GetLogReplyProto parseFrom = GetLogReplyProto.parseFrom(reply.getMessage().getContent()); + return LogServiceProtoUtil.toLogStream(parseFrom.getLogStream()); + } + + @Override + public Iterator<LogStream> listLogs() throws IOException { + RaftClientReply reply = + raftClient + .send(Message.valueOf(LogServiceProtoUtil.toListLogRequestProto().toByteString())); + ListLogsReplyProto parseFrom = ListLogsReplyProto.parseFrom(reply.getMessage().getContent()); + List<LogStreamProto> logStremsList = parseFrom.getLogStremsList(); + return LogServiceProtoUtil.toListLogStreams(logStremsList).iterator(); + } + + @Override + public void closeLog(LogName name) throws IOException { + RaftClientReply reply = + raftClient.send(Message.valueOf(LogServiceProtoUtil.toCloseLogRequestProto(name) + .toByteString())); + CloseLogReplyProto parseFrom = CloseLogReplyProto.parseFrom(reply.getMessage().getContent()); + } + + @Override + public State getState(LogName name) throws IOException { + RaftClientReply reply = + raftClient.send(Message.valueOf(LogServiceProtoUtil.toGetStateRequestProto(name) + .toByteString())); + GetStateReplyProto parseFrom = GetStateReplyProto.parseFrom(reply.getMessage().getContent()); + return parseFrom.getState() == LogStreamState.OPEN ? State.OPEN : State.CLOSED; + } + + @Override + public void archiveLog(LogName name) throws IOException { + RaftClientReply reply = + raftClient.send(Message.valueOf(LogServiceProtoUtil.toArchiveLogRequestProto(name) + .toByteString())); + ArchiveLogReplyProto parseFrom = + ArchiveLogReplyProto.parseFrom(reply.getMessage().getContent()); + } + + @Override + public void deleteLog(LogName name) throws IOException { + RaftClientReply reply = + raftClient.send(Message.valueOf(LogServiceProtoUtil.toDeleteLogRequestProto(name) + .toByteString())); + DeleteLogReplyProto parseFrom = DeleteLogReplyProto.parseFrom(reply.getMessage().getContent()); + } + + @Override + public void updateConfiguration(LogName name, LogStreamConfiguration config) { + // TODO Auto-generated method stub + + } + + @Override + public void addRecordListener(LogName name, RecordListener listener) { + // TODO Auto-generated method stub + + } + + @Override + public void removeRecordListener(LogName name, RecordListener listener) { + // TODO Auto-generated method stub + + } + + @Override + public void close() throws IOException { + // TODO Auto-generated method stub + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/59de6bd6/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceProtoUtil.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceProtoUtil.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceProtoUtil.java new file mode 100644 index 0000000..998dc17 --- /dev/null +++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceProtoUtil.java @@ -0,0 +1,179 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package org.apache.ratis.logservice.util; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.ratis.logservice.api.LogMessage; +import org.apache.ratis.logservice.api.LogName; +import org.apache.ratis.logservice.api.LogStream; +import org.apache.ratis.logservice.api.LogStream.State; +import org.apache.ratis.logservice.impl.BaseLogStream; +import org.apache.ratis.proto.logservice.LogServiceProtos.ArchiveLogRequestProto; +import org.apache.ratis.proto.logservice.LogServiceProtos.CloseLogRequestProto; +import org.apache.ratis.proto.logservice.LogServiceProtos.CreateLogReplyProto; +import org.apache.ratis.proto.logservice.LogServiceProtos.CreateLogRequestProto; +import org.apache.ratis.proto.logservice.LogServiceProtos.DeleteLogRequestProto; +import org.apache.ratis.proto.logservice.LogServiceProtos.GetLogReplyProto; +import org.apache.ratis.proto.logservice.LogServiceProtos.GetLogRequestProto; +import org.apache.ratis.proto.logservice.LogServiceProtos.GetStateReplyProto; +import org.apache.ratis.proto.logservice.LogServiceProtos.GetStateRequestProto; +import org.apache.ratis.proto.logservice.LogServiceProtos.ListLogsReplyProto; +import org.apache.ratis.proto.logservice.LogServiceProtos.ListLogsReplyProto.Builder; +import org.apache.ratis.proto.logservice.LogServiceProtos.ListLogsRequestProto; +import org.apache.ratis.proto.logservice.LogServiceProtos.LogNameProto; +import org.apache.ratis.proto.logservice.LogServiceProtos.LogServiceRequestProto; +import org.apache.ratis.proto.logservice.LogServiceProtos.LogStreamProto; +import org.apache.ratis.proto.logservice.LogServiceProtos.LogStreamState; + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +public class LogServiceProtoUtil { + public static LogServiceRequestProto toCreateLogRequestProto(LogName logName) { + LogNameProto logNameProto = LogNameProto.newBuilder().setName(logName.getName()).build(); + CreateLogRequestProto createLog = + CreateLogRequestProto.newBuilder().setLogName(logNameProto).build(); + return LogServiceRequestProto.newBuilder().setCreateLog(createLog).build(); + } + + public static LogServiceRequestProto toListLogRequestProto() { + ListLogsRequestProto listLogs = ListLogsRequestProto.newBuilder().build(); + return LogServiceRequestProto.newBuilder().setListLogs(listLogs).build(); + } + + public static LogServiceRequestProto toGetLogRequestProto(LogName name) { + GetLogRequestProto getLog = + GetLogRequestProto.newBuilder().setLogName(toLogNameProto(name)).build(); + return LogServiceRequestProto.newBuilder().setGetLog(getLog).build(); + } + + public static LogServiceRequestProto toCloseLogRequestProto(LogName logName) { + LogNameProto logNameProto = LogNameProto.newBuilder().setName(logName.getName()).build(); + CloseLogRequestProto closeLog = + CloseLogRequestProto.newBuilder().setLogName(logNameProto).build(); + return LogServiceRequestProto.newBuilder().setCloseLog(closeLog).build(); + } + + public static LogServiceRequestProto toGetStateRequestProto(LogName logName) { + LogNameProto logNameProto = LogNameProto.newBuilder().setName(logName.getName()).build(); + GetStateRequestProto getState = + GetStateRequestProto.newBuilder().setLogName(logNameProto).build(); + return LogServiceRequestProto.newBuilder().setGetState(getState).build(); + } + + public static LogServiceRequestProto toArchiveLogRequestProto(LogName logName) { + LogNameProto logNameProto = LogNameProto.newBuilder().setName(logName.getName()).build(); + ArchiveLogRequestProto archiveLog = + ArchiveLogRequestProto.newBuilder().setLogName(logNameProto).build(); + return LogServiceRequestProto.newBuilder().setArchiveLog(archiveLog).build(); + } + + public static LogServiceRequestProto toDeleteLogRequestProto(LogName logName) { + LogNameProto logNameProto = LogNameProto.newBuilder().setName(logName.getName()).build(); + DeleteLogRequestProto deleteLog = + DeleteLogRequestProto.newBuilder().setLogName(logNameProto).build(); + return LogServiceRequestProto.newBuilder().setDeleteLog(deleteLog).build(); + } + + public static LogMessage toLogMessage( + org.apache.ratis.proto.logservice.LogServiceProtos.LogMessage message) { + if (!message.getData().isEmpty()) { + return new LogMessage(LogName.of(message.getLogName()), message.getData().toByteArray()); + } else if (message.getLength() != 0) { + return new LogMessage(LogName.of(message.getLogName()), message.getLength()); + } else { + return new LogMessage(LogName.of(message.getLogName())); + } + } + + public static LogNameProto toLogNameProto(LogName logName) { + return LogNameProto.newBuilder().setName(logName.getName()).build(); + } + + public static LogName toLogName(LogNameProto logNameProto) { + return LogName.of(logNameProto.getName()); + } + + public static LogStreamProto toLogStreamProto(LogStream logStream) { + LogNameProto logNameProto = + LogNameProto.newBuilder().setName(logStream.getName().getName()).build(); + LogStreamProto logStreamProto = + LogStreamProto + .newBuilder() + .setLogName(logNameProto) + .setSize(logStream.getSize()) + .setState( + logStream.getState().equals(State.OPEN) ? LogStreamState.OPEN : LogStreamState.CLOSED) + .build(); + return logStreamProto; + } + + public static LogStream toLogStream(LogStreamProto logStream) { + return new BaseLogStream(toLogName(logStream.getLogName()), + (logStream.getState() == LogStreamState.OPEN ? State.OPEN : State.CLOSED), + logStream.getSize()); + } + + public static CreateLogReplyProto toCreateLogReplyProto(LogStream logStream) { + LogNameProto logNameProto = + LogNameProto.newBuilder().setName(logStream.getName().getName()).build(); + LogStreamProto logStreamProto = + LogStreamProto + .newBuilder() + .setLogName(logNameProto) + .setSize(logStream.getSize()) + .setState( + logStream.getState().equals(State.OPEN) ? LogStreamState.OPEN : LogStreamState.CLOSED) + .build(); + return CreateLogReplyProto.newBuilder().setLogStream(logStreamProto).build(); + } + + public static ListLogsReplyProto toListLogLogsReplyProto(List<LogStream> logStreams) { + Builder newBuilder = ListLogsReplyProto.newBuilder(); + for (LogStream stream : logStreams) { + newBuilder.addLogStrems(toLogStreamProto(stream)); + } + return newBuilder.build(); + } + + public static List<LogStream> toListLogStreams(List<LogStreamProto> logStreamProtos) { + List<LogStream> logStreams = new ArrayList<>(logStreamProtos.size()); + for (LogStreamProto proto : logStreamProtos) { + logStreams.add(toLogStream(proto)); + } + return logStreams; + } + + public static GetLogReplyProto toGetLogReplyProto(LogStream logStream) { + return GetLogReplyProto.newBuilder().setLogStream(toLogStreamProto(logStream)).build(); + } + + public static GetStateReplyProto toGetStateReplyProto(boolean exists) { + return GetStateReplyProto.newBuilder() + .setState(exists ? LogStreamState.OPEN : LogStreamState.CLOSED).build(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/59de6bd6/ratis-logservice/src/test/java/org/apache/ratis/logservice/LogServiceBaseTest.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/test/java/org/apache/ratis/logservice/LogServiceBaseTest.java b/ratis-logservice/src/test/java/org/apache/ratis/logservice/LogServiceBaseTest.java new file mode 100644 index 0000000..ecdc905 --- /dev/null +++ b/ratis-logservice/src/test/java/org/apache/ratis/logservice/LogServiceBaseTest.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.logservice; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; + +import org.apache.ratis.BaseTest; +import org.apache.ratis.MiniRaftCluster; +import org.apache.ratis.RaftTestUtil; +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.logservice.api.LogName; +import org.apache.ratis.logservice.api.LogService; +import org.apache.ratis.logservice.api.LogStateMachine; +import org.apache.ratis.logservice.api.LogStream; +import org.apache.ratis.logservice.api.LogStream.State; +import org.apache.ratis.statemachine.StateMachine; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class LogServiceBaseTest<CLUSTER extends MiniRaftCluster> + extends BaseTest + implements MiniRaftCluster.Factory.Get<CLUSTER> { + public static final Logger LOG = LoggerFactory.getLogger(LogServiceBaseTest.class); + + { + final RaftProperties p = getProperties(); + p.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, + LogStateMachine.class, StateMachine.class); + } + + static final int NUM_PEERS = 3; + CLUSTER cluster; + + @Before + public void setUpCluster() throws IOException, InterruptedException { + cluster = newCluster(NUM_PEERS); + cluster.start(); + RaftTestUtil.waitForLeader(cluster); + } + + @Test + public void testLogServiceAdminAPIs() throws Exception { + RaftClient raftClient = + RaftClient.newBuilder().setProperties(getProperties()).setRaftGroup(cluster.getGroup()) + .build(); + LogService logService = LogServiceFactory.getInstance().createLogService(raftClient); + LogName logName = LogName.of("log1"); + LogStream logStream = logService.createLog(logName); + assertEquals("log1", logStream.getName().getName()); + assertEquals(State.OPEN, logStream.getState()); + assertEquals(0, logStream.getSize()); + logService.getLog(logName); + assertEquals("log1", logStream.getName().getName()); + assertEquals(State.OPEN, logStream.getState()); + assertEquals(0, logStream.getSize()); + logStream = logService.listLogs().next(); + assertEquals("log1", logStream.getName().getName()); + assertEquals(State.OPEN, logStream.getState()); + assertEquals(0, logStream.getSize()); + State state = logService.getState(logName); + assertEquals(State.OPEN, state); + } + + @After + public void tearDown() { + cluster.shutdown(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/59de6bd6/ratis-logservice/src/test/java/org/apache/ratis/logservice/TestLogServiceWithGrpc.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/test/java/org/apache/ratis/logservice/TestLogServiceWithGrpc.java b/ratis-logservice/src/test/java/org/apache/ratis/logservice/TestLogServiceWithGrpc.java new file mode 100644 index 0000000..c3549a8 --- /dev/null +++ b/ratis-logservice/src/test/java/org/apache/ratis/logservice/TestLogServiceWithGrpc.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.logservice; + +import org.apache.ratis.grpc.MiniRaftClusterWithGrpc; + +public class TestLogServiceWithGrpc extends LogServiceBaseTest<MiniRaftClusterWithGrpc> + implements MiniRaftClusterWithGrpc.FactoryGet { +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/59de6bd6/ratis-logservice/src/test/java/org/apache/ratis/logservice/TestLogServiceWithNetty.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/test/java/org/apache/ratis/logservice/TestLogServiceWithNetty.java b/ratis-logservice/src/test/java/org/apache/ratis/logservice/TestLogServiceWithNetty.java new file mode 100644 index 0000000..e483627 --- /dev/null +++ b/ratis-logservice/src/test/java/org/apache/ratis/logservice/TestLogServiceWithNetty.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.logservice; + +import org.apache.ratis.netty.MiniRaftClusterWithNetty; + +public class TestLogServiceWithNetty + extends LogServiceBaseTest<MiniRaftClusterWithNetty> + implements MiniRaftClusterWithNetty.FactoryGet { +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/59de6bd6/ratis-proto/src/main/proto/Logservice.proto ---------------------------------------------------------------------- diff --git a/ratis-proto/src/main/proto/Logservice.proto b/ratis-proto/src/main/proto/Logservice.proto index 8ea32f2..40964e0 100644 --- a/ratis-proto/src/main/proto/Logservice.proto +++ b/ratis-proto/src/main/proto/Logservice.proto @@ -33,3 +33,83 @@ message LogMessage { uint64 length = 3; bytes data = 4; } +message LogServiceRequestProto { + oneof Request { + LogMessage logMessage = 1; + CreateLogRequestProto createLog = 2; + ListLogsRequestProto listLogs = 3; + GetLogRequestProto getLog = 4; + CloseLogRequestProto closeLog = 5; + GetStateRequestProto getState = 6; + ArchiveLogRequestProto archiveLog = 7; + DeleteLogRequestProto deleteLog = 8; + } +} + +message LogNameProto { + string name = 1; +} + +message CreateLogRequestProto { + LogNameProto logName = 1; +} + +message ListLogsRequestProto { + repeated LogStreamProto logSreams = 1; +} + +message GetLogRequestProto { + LogNameProto logName = 1; +} + +message CloseLogRequestProto { + LogNameProto logName = 1; +} + +message GetStateRequestProto { + LogNameProto logName = 1; +} + +message ArchiveLogRequestProto { + LogNameProto logName = 1; +} + +message DeleteLogRequestProto { + LogNameProto logName = 1; +} + +message LogStreamProto { + LogNameProto logName = 1; + uint64 size = 2; + LogStreamState state = 3; +} + +enum LogStreamState { + OPEN = 0; + CLOSED = 1; +} + +message CreateLogReplyProto { + LogStreamProto logStream = 1; +} + +message GetLogReplyProto { + LogStreamProto logStream = 1; +} + +message ListLogsReplyProto { + repeated LogStreamProto logStrems = 1; +} + +message CloseLogReplyProto { +} + +message GetStateReplyProto { + LogStreamState state = 1; +} + +message ArchiveLogReplyProto { +} + +message DeleteLogReplyProto { +}
