RATIS-274. Read/Write-path of log stream state machine Signed-off-by: Josh Elser <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/48d6a2a4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/48d6a2a4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/48d6a2a4 Branch: refs/heads/master Commit: 48d6a2a421a8b44f4080bad0ef4c004cbf7b456d Parents: 9774c5c Author: Vladimir Rodionov <[email protected]> Authored: Wed Oct 17 15:33:06 2018 -0700 Committer: Josh Elser <[email protected]> Committed: Thu Oct 18 14:43:50 2018 -0400 ---------------------------------------------------------------------- .../ratis/logservice/LogServiceFactory.java | 8 +- .../apache/ratis/logservice/api/LogMessage.java | 139 +-------- .../apache/ratis/logservice/api/LogService.java | 23 +- .../logservice/api/LogServiceConfiguration.java | 88 ++++++ .../ratis/logservice/api/LogStateMachine.java | 293 ++++++++++++++----- .../apache/ratis/logservice/api/LogStream.java | 28 +- .../logservice/api/LogStreamConfiguration.java | 64 ---- .../ratis/logservice/dummy/DummyLogService.java | 24 +- .../ratis/logservice/dummy/DummyLogStream.java | 29 +- .../ratis/logservice/impl/BaseLogStream.java | 85 ------ .../ratis/logservice/impl/LogReaderImpl.java | 155 ++++++++++ .../ratis/logservice/impl/LogServiceImpl.java | 57 ++-- .../ratis/logservice/impl/LogStreamImpl.java | 159 ++++++++++ .../ratis/logservice/impl/LogWriterImpl.java | 97 ++++++ .../logservice/util/LogServiceProtoUtil.java | 218 ++++++++++++-- .../org/apache/ratis/logservice/util/Utils.java | 48 --- .../ratis/logservice/LogServiceBaseTest.java | 7 +- .../ratis/logservice/api/TestApiExample.java | 4 +- .../ratis/logservice/api/TestLogMessage.java | 55 ---- .../util/TestLogServiceProtoUtil.java | 290 ++++++++++++++++++ .../apache/ratis/logservice/util/TestUtils.java | 84 +----- ratis-proto/src/main/proto/Logservice.proto | 97 ++++-- .../ratis/server/impl/RaftServerProxy.java | 2 +- 23 files changed, 1438 insertions(+), 616 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/48d6a2a4/ratis-logservice/src/main/java/org/apache/ratis/logservice/LogServiceFactory.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/LogServiceFactory.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/LogServiceFactory.java index 084188e..1b9966a 100644 --- a/ratis-logservice/src/main/java/org/apache/ratis/logservice/LogServiceFactory.java +++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/LogServiceFactory.java @@ -19,9 +19,13 @@ package org.apache.ratis.logservice; import org.apache.ratis.client.RaftClient; import org.apache.ratis.logservice.api.LogService; +import org.apache.ratis.logservice.api.LogServiceConfiguration; import org.apache.ratis.logservice.impl.LogServiceImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class LogServiceFactory { + public static final Logger LOG = LoggerFactory.getLogger(LogServiceFactory.class); private static final LogServiceFactory INSTANCE = new LogServiceFactory(); private LogServiceFactory() {} @@ -31,8 +35,8 @@ public class LogServiceFactory { * * @param raftClient The client to a Raft quorum. */ - public LogService createLogService(RaftClient raftClient) { - return new LogServiceImpl(raftClient); + public LogService createLogService(RaftClient raftClient, LogServiceConfiguration config) { + return new LogServiceImpl(raftClient, config); } /** http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/48d6a2a4/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogMessage.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogMessage.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogMessage.java index cfc53a5..3d3bb56 100644 --- a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogMessage.java +++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogMessage.java @@ -17,159 +17,34 @@ */ package org.apache.ratis.logservice.api; -import java.nio.charset.Charset; - import org.apache.ratis.protocol.Message; -import org.apache.ratis.proto.logservice.LogServiceProtos; -import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; -import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException; +public abstract class LogMessage implements Message { -public class LogMessage implements Message { - public static final Charset UTF8 = Charset.forName("UTF-8"); /* - * Type of message + * Type of messages */ public static enum Type { - READ_REQUEST, READ_REPLY, WRITE + APPEND, CLOSE, SYNC, GET_START_INDEX, READ, GET_LENGTH } /* - * For all READ and WRITE requests - */ - private final LogName name; - - /* - * Set only for READ_REPLY response - */ - private long length; - - /* - * Pay load for WRITE request - */ - private byte[] data; - - /* - * Type of message - */ - - private Type type; - - /** - * Constructor for WRITE request - * @param logName name of a log - * @param data pay load data - */ - public LogMessage(LogName logName, byte[] data) { - this.name = logName; - this.data = data; - this.type = Type.WRITE; - } - - /** - * Constructor for READ reply - * @param logName name of a log - * @param length length of a log - */ - public LogMessage(LogName logName, long length) { - this.name = logName; - this.length = length; - this.type = Type.READ_REPLY; - } - - /** - * Constructor for READ request - * @param logName name of a log + * Log name */ - public LogMessage(LogName logName) { - this.name = logName; - this.type = Type.READ_REQUEST; - } - - public static LogMessage parseFrom(ByteString data) - throws InvalidProtocolBufferException { - LogServiceProtos.LogMessage msg = LogServiceProtos.LogMessage.parseFrom(data); - LogServiceProtos.MessageType type = msg.getType(); - long length = 0; - LogName name = null; - byte[] bdata = null; - name = LogName.of(msg.getLogName()); - switch (type) { - case READ_REPLY: - length = msg.getLength(); - return new LogMessage(name, length); - case READ_REQUEST: - return new LogMessage(name); - case WRITE: - bdata = msg.getData().toByteArray(); - return new LogMessage(name, bdata); - default: - //TODO replace exception - throw new RuntimeException("Wrong message type: "+ type); - } - } - + protected LogName logName; /** * Get log name * @return log name */ public LogName getLogName() { - return name; - } - - /** - * Get log length - * @return log length - */ - public long getLength() { - return length; - } - - /** - * Get log message data - * @return data - */ - public byte[] getData() { - return data; + return logName; } /** * Get message type * @return message type */ - public Type getType() { - return type; - } + public abstract Type getType(); - - @SuppressWarnings("deprecation") - @Override - public ByteString getContent() { - LogServiceProtos.LogMessage.Builder builder = LogServiceProtos.LogMessage.newBuilder(); - builder.setType(LogServiceProtos.MessageType.valueOf(type.ordinal())); - builder.setLogName(name.getName()); - switch (type) { - case READ_REPLY: - builder.setLength(length); - break; - case WRITE: - builder.setData(ByteString.copyFrom(data)); - break; - default: - } - - return builder.build().toByteString(); - } - - @Override - public String toString() { - String s = type.name() + " : log=" + name.getName(); - if (type == Type.READ_REPLY) { - s += " len=" + length; - } else if (type == Type.WRITE) { - s += " data len=" + data.length; - } - return s; - } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/48d6a2a4/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogService.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogService.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogService.java index c78ed2a..de47c62 100644 --- a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogService.java +++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogService.java @@ -20,6 +20,7 @@ package org.apache.ratis.logservice.api; import java.io.IOException; import java.util.Iterator; +import org.apache.ratis.client.RaftClient; import org.apache.ratis.logservice.api.LogStream.State; /** @@ -46,7 +47,7 @@ public interface LogService extends AutoCloseable { * @param name Unique name for this LogStream. * @param config Configuration object for this LogStream */ - LogStream createLog(LogName name, LogStreamConfiguration config) throws IOException; + LogStream createLog(LogName name, LogServiceConfiguration config) throws IOException; /* * How to get LogStreams that already exist @@ -107,7 +108,7 @@ public interface LogService extends AutoCloseable { * * @param config The new configuration object */ - void updateConfiguration(LogName name, LogStreamConfiguration config); + void updateConfiguration(LogName name, LogServiceConfiguration config); /** * Registers a {@link RecordListener} with the log which will receive all records written using @@ -129,11 +130,27 @@ public interface LogService extends AutoCloseable { * * @param the log's name * @param listener The listener to remove + * @return */ - void removeRecordListener(LogName name, RecordListener listener); + boolean removeRecordListener(LogName name, RecordListener listener); /** * Overrides {@link #close()} in {@link AutoCloseable} to throw an IOException. */ + @Override void close() throws IOException; + + /** + * Gets Raft client object + * @return raft client object + */ + RaftClient getRaftClient(); + + /** + * Gets configuration + * @return configuration + */ + LogServiceConfiguration getConfiguration(); + + } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/48d6a2a4/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogServiceConfiguration.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogServiceConfiguration.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogServiceConfiguration.java new file mode 100644 index 0000000..78119e5 --- /dev/null +++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogServiceConfiguration.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.logservice.api; + +import java.util.Collections; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; + +/** + * An encapsulation of configuration for a LogService. + */ +public class LogServiceConfiguration { + + private ConcurrentHashMap<String, String> configMap = + new ConcurrentHashMap<String, String>(); + + /** + * Ctor + */ + public LogServiceConfiguration() { + } + + /** + * Fetches the value for the given key from the configuration. If there is no entry for + * the given key, {@code null} is returned. + * + * @param key The configuration key + */ + public String get(String key) { + return configMap.get(key); + } + + /** + * Sets the given key and value into this configuration. The configuration key may + * not be null. A null value removes the key from the configuration. + * + * @param key Configuration key, must be non-null + * @param value Configuration value + */ + public void set(String key, String value) { + configMap.put(key, value); + } + + /** + * Removes any entry with the given key from the configuration. If there is no entry + * for the given key, this method returns without error. The provided key must be + * non-null. + * + * @param key The configuration key, must be non-null + * @return value + */ + public String remove(String key) { + return configMap.remove(key); + } + + /** + * Sets the collection of key-value pairs into the configuration. This is functionally + * equivalent to calling {@link #set(String, String)} numerous time. + */ + public void setMany(Iterable<Entry<String,String>> entries) { + for (Entry<String, String> entry: entries ) { + configMap.put(entry.getKey(), entry.getValue()); + } + } + + /** + * Returns an immutable view over the configuration as a {@code Map}. + */ + public Map<String,String> asMap() { + return Collections.unmodifiableMap(configMap); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/48d6a2a4/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStateMachine.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStateMachine.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStateMachine.java index 4bfa294..1a3edc2 100644 --- a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStateMachine.java +++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStateMachine.java @@ -17,17 +17,28 @@ */ package org.apache.ratis.logservice.api; -import org.apache.ratis.logservice.api.LogStream.State; -import org.apache.ratis.logservice.impl.BaseLogStream; +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.ratis.logservice.impl.LogStreamImpl; import org.apache.ratis.logservice.util.LogServiceProtoUtil; -import org.apache.ratis.protocol.Message; -import org.apache.ratis.protocol.RaftGroupId; -import org.apache.ratis.server.RaftServer; -import org.apache.ratis.server.impl.RaftServerConstants; -import org.apache.ratis.server.protocol.TermIndex; -import org.apache.ratis.server.storage.RaftStorage; import org.apache.ratis.proto.RaftProtos; import org.apache.ratis.proto.RaftProtos.LogEntryProto; +import org.apache.ratis.proto.logservice.LogServiceProtos.AppendLogEntryRequestProto; import org.apache.ratis.proto.logservice.LogServiceProtos.ArchiveLogReplyProto; import org.apache.ratis.proto.logservice.LogServiceProtos.ArchiveLogRequestProto; import org.apache.ratis.proto.logservice.LogServiceProtos.CloseLogReplyProto; @@ -35,35 +46,48 @@ import org.apache.ratis.proto.logservice.LogServiceProtos.CloseLogRequestProto; import org.apache.ratis.proto.logservice.LogServiceProtos.CreateLogRequestProto; import org.apache.ratis.proto.logservice.LogServiceProtos.DeleteLogReplyProto; import org.apache.ratis.proto.logservice.LogServiceProtos.DeleteLogRequestProto; +import org.apache.ratis.proto.logservice.LogServiceProtos.GetLogLengthRequestProto; import org.apache.ratis.proto.logservice.LogServiceProtos.GetLogReplyProto; import org.apache.ratis.proto.logservice.LogServiceProtos.GetLogRequestProto; import org.apache.ratis.proto.logservice.LogServiceProtos.GetStateRequestProto; import org.apache.ratis.proto.logservice.LogServiceProtos.LogServiceRequestProto; +import org.apache.ratis.proto.logservice.LogServiceProtos.ReadLogRequestProto; +import org.apache.ratis.protocol.Message; +import org.apache.ratis.protocol.RaftGroupId; +import org.apache.ratis.server.RaftServer; +import org.apache.ratis.server.impl.RaftServerConstants; +import org.apache.ratis.server.impl.RaftServerProxy; +import org.apache.ratis.server.impl.ServerState; +import org.apache.ratis.server.protocol.TermIndex; +import org.apache.ratis.server.storage.RaftLog; +import org.apache.ratis.server.storage.RaftLogIOException; +import org.apache.ratis.server.storage.RaftStorage; import org.apache.ratis.statemachine.StateMachineStorage; import org.apache.ratis.statemachine.TransactionContext; import org.apache.ratis.statemachine.impl.BaseStateMachine; import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage; import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo; import org.apache.ratis.util.AutoCloseableLock; -import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException; - -import java.io.*; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class LogStateMachine extends BaseStateMachine { + public static final Logger LOG = LoggerFactory.getLogger(LogStateMachine.class); + /* + * State + */ private final Map<LogName, Long> state = new ConcurrentHashMap<>(); private final SimpleStateMachineStorage storage = new SimpleStateMachineStorage(); private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); + private RaftLog log; + + private RaftGroupId groupId; + + private RaftServerProxy proxy ; + private AutoCloseableLock readLock() { return AutoCloseableLock.acquire(lock.readLock()); } @@ -72,6 +96,9 @@ public class LogStateMachine extends BaseStateMachine { return AutoCloseableLock.acquire(lock.writeLock()); } + /** + * Reset state machine + */ void reset() { state.clear(); setLastAppliedTermIndex(null); @@ -82,9 +109,18 @@ public class LogStateMachine extends BaseStateMachine { RaftStorage raftStorage) throws IOException { super.initialize(server, groupId, raftStorage); this.storage.init(raftStorage); + this.proxy = (RaftServerProxy) server; + this.groupId = groupId; loadSnapshot(storage.getLatestSnapshot()); } + private void checkInitialization() throws IOException { + if (this.log == null) { + ServerState state = proxy.getImpl(groupId).getState(); + this.log = state.getLog(); + } + } + @Override public void reinitialize() throws IOException { close(); @@ -152,26 +188,147 @@ public class LogStateMachine extends BaseStateMachine { @Override public CompletableFuture<Message> query(Message request) { - LogMessage msg = null; + try { - msg = LogMessage.parseFrom(request.getContent()); - LogName logName = msg.getLogName(); - Long len = null; - try(final AutoCloseableLock readLock = readLock()) { - len = state.get(logName); - if (len == null) { - len = new Long(-1); - } + + checkInitialization(); + LogServiceRequestProto logServiceRequestProto = + LogServiceRequestProto.parseFrom(request.getContent()); + + switch (logServiceRequestProto.getRequestCase()) { + + case READNEXTQUERY: + return processReadRequest(logServiceRequestProto); + case LENGTHQUERY: + return processGetLengthRequest(logServiceRequestProto); + case STARTINDEXQUERY: + return processGetStartIndexRequest(logServiceRequestProto); + case LISTLOGS: + return processListLogsRequest(); + case GETLOG: + return processGetLogRequest(logServiceRequestProto); + case GETSTATE: + return processGetStateRequest(logServiceRequestProto); + default: + // TODO + throw new RuntimeException( + "Wrong message type for query: " + logServiceRequestProto.getRequestCase()); } - LOG.debug("QUERY: {}, RESULT: {}", msg, len); - return CompletableFuture.completedFuture(new LogMessage (logName, len)); - } catch (InvalidProtocolBufferException e) { - //TODO exception handling + + } catch (IOException e) { + // TODO exception handling throw new RuntimeException(e); } } + /** + * Process get start index request + * @param msg message + * @return reply message + */ + private CompletableFuture<Message> + processGetStartIndexRequest(LogServiceRequestProto proto) + { + long startIndex =log.getStartIndex(); + return CompletableFuture.completedFuture(Message + .valueOf(LogServiceProtoUtil.toGetLogStartIndexReplyProto(startIndex, null).toByteString())); + } + + /** + * Process get length request + * @param msg message + * @return reply message + */ + private CompletableFuture<Message> processGetLengthRequest(LogServiceRequestProto proto) { + Long len = null; + GetLogLengthRequestProto msgProto = proto.getLengthQuery(); + LogName logName = LogName.of(msgProto.getLogName().getName()); + try(final AutoCloseableLock readLock = readLock()) { + len = state.get(logName); + if (len == null) { + len = new Long(-1); + } + } + LOG.debug("QUERY: {}, RESULT: {}", msgProto, len); + return CompletableFuture.completedFuture(Message + .valueOf(LogServiceProtoUtil.toGetLogLengthReplyProto(len, null).toByteString())); + } + + /** + * Process read log entries request + * @param msg message + * @return reply message + */ + private CompletableFuture<Message> processReadRequest(LogServiceRequestProto proto) { + + ReadLogRequestProto msgProto = proto.getReadNextQuery(); + long startRecordId = msgProto.getStartRecordId(); + int num = msgProto.getNumRecords(); + Throwable t = null; + List<byte[]> list = new ArrayList<byte[]>(); + for (long index = startRecordId; index < startRecordId + num; index++) { + try { + list.add(log.getEntryWithData(index).getEntry().getStateMachineLogEntry().getLogData().toByteArray()); + } catch(RaftLogIOException e) { + t = e; + list = null; + break; + } + } + return CompletableFuture.completedFuture(Message + .valueOf(LogServiceProtoUtil.toReadLogReplyProto(list, t).toByteString())); + } + + /** + * Process sync request + * @param trx transaction + * @param logMessage message + * @return reply message + */ + private CompletableFuture<Message> processSyncRequest(TransactionContext trx, + LogServiceRequestProto logMessage) { + + // TODO + return CompletableFuture.completedFuture(Message + .valueOf(LogServiceProtoUtil.toSyncLogReplyProto(null).toByteString())); + + } + + private CompletableFuture<Message> processAppendRequest(TransactionContext trx, + LogServiceRequestProto logProto) { + + final LogEntryProto entry = trx.getLogEntry(); + AppendLogEntryRequestProto proto = logProto.getAppendRequest(); + final long index = entry.getIndex(); + Long val = null; + LogName name = null; + long total = 0; + try (final AutoCloseableLock writeLock = writeLock()) { + name = LogServiceProtoUtil.toLogName(proto.getLogName()); + List<byte[]> entries = LogServiceProtoUtil.toListByteArray(proto.getDataList()); + for (byte[] bb : entries) { + total += bb.length; + } + val = state.get(name); + if (val == null) { + val = new Long(0); + } + state.put(name, val + total); + // TODO do we need this for other write request (close, sync) + updateLastAppliedTermIndex(entry.getTerm(), index); + } + final CompletableFuture<Message> f = + // TODO record ids? + CompletableFuture.completedFuture(Message + .valueOf(LogServiceProtoUtil.toAppendLogReplyProto(null, null).toByteString())); + final RaftProtos.RaftPeerRole role = trx.getServerRole(); + LOG.debug("{}:{}-{}: {} new length {}", role, getId(), index, proto, val); + if (LOG.isTraceEnabled()) { + LOG.trace("{}-{}: variables={}", getId(), index, state); + } + return f; + } @Override public void close() { @@ -181,54 +338,29 @@ public class LogStateMachine extends BaseStateMachine { @Override public CompletableFuture<Message> applyTransaction(TransactionContext trx) { try { + checkInitialization(); final LogEntryProto entry = trx.getLogEntry(); LogServiceRequestProto logServiceRequestProto = LogServiceRequestProto.parseFrom(entry.getStateMachineLogEntry().getLogData()); - CompletableFuture<Message> f = null; switch (logServiceRequestProto.getRequestCase()) { - case LOGMESSAGE: - org.apache.ratis.proto.logservice.LogServiceProtos.LogMessage logMessage2 = logServiceRequestProto.getLogMessage(); - final LogMessage logMessage = LogMessage.parseFrom((entry.getStateMachineLogEntry().getLogData())); - - final long index = entry.getIndex(); - Long val = null; - LogName name = null; - try (final AutoCloseableLock writeLock = writeLock()) { - name = logMessage.getLogName(); - long dataLength = logMessage.getData().length; - val = state.get(name); - if (val == null) { - val = new Long(0); - } - state.put(name, val + dataLength); - updateLastAppliedTermIndex(entry.getTerm(), index); - } - f = - CompletableFuture.completedFuture(new LogMessage(name, val)); - final RaftProtos.RaftPeerRole role = trx.getServerRole(); - LOG.debug("{}:{}-{}: {} new length {}", role, getId(), index, logMessage, val); - if (LOG.isTraceEnabled()) { - LOG.trace("{}-{}: variables={}", getId(), index, state); - } - return f; - case CREATELOG: - return processCreateLogRequest(logServiceRequestProto); - case LISTLOGS: - return processListLogsRequest(); - case GETLOG: - return processGetLogRequest(logServiceRequestProto); - case GETSTATE: - return processGetStateRequest(logServiceRequestProto); - case ARCHIVELOG: - return processArchiveLog(logServiceRequestProto); - case CLOSELOG: - return processCloseLog(logServiceRequestProto); - case DELETELOG: - return processDeleteLog(logServiceRequestProto); - default: - return null; + + case CREATELOG: + return processCreateLogRequest(logServiceRequestProto); + case ARCHIVELOG: + return processArchiveLog(logServiceRequestProto); + case CLOSELOG: + return processCloseLog(logServiceRequestProto); + case DELETELOG: + return processDeleteLog(logServiceRequestProto); + case APPENDREQUEST: + return processAppendRequest(trx, logServiceRequestProto); + case SYNCREQUEST: + return processSyncRequest(trx, logServiceRequestProto); + default: + //TODO + return null; } - } catch (InvalidProtocolBufferException e) { + } catch (IOException e) { // TODO exception handling throw new RuntimeException(e); } @@ -285,26 +417,31 @@ public class LogStateMachine extends BaseStateMachine { } state.put(name, val); } + //TODO This can't be part of a state machine (REMOVE) return CompletableFuture.completedFuture(Message.valueOf(LogServiceProtoUtil - .toCreateLogReplyProto(new BaseLogStream(name, State.OPEN, val)).toByteString())); + .toCreateLogReplyProto( + new LogStreamImpl(name, null, new LogServiceConfiguration())).toByteString())); } + //TODO REMOVE this code private CompletableFuture<Message> processListLogsRequest() { List<LogStream> logStreams = new ArrayList<LogStream>(state.size()); for (Entry<LogName, Long> e : state.entrySet()) { - logStreams.add(new BaseLogStream(e.getKey(), State.OPEN, e.getValue())); + logStreams.add(new LogStreamImpl(e.getKey(), null, new LogServiceConfiguration())); } return CompletableFuture.completedFuture(Message.valueOf(LogServiceProtoUtil .toListLogLogsReplyProto(logStreams).toByteString())); } + //TODO REMOVE this code + private CompletableFuture<Message> processGetLogRequest( LogServiceRequestProto logServiceRequestProto) { GetLogRequestProto getLog = logServiceRequestProto.getGetLog(); LogName logName = LogServiceProtoUtil.toLogName(getLog.getLogName()); if (state.containsKey(logName)) { return CompletableFuture.completedFuture(Message.valueOf(LogServiceProtoUtil - .toGetLogReplyProto(new BaseLogStream(logName, State.OPEN, state.get(logName))) + .toGetLogReplyProto(new LogStreamImpl(logName, null, new LogServiceConfiguration())) .toByteString())); } else { return CompletableFuture.completedFuture(Message.valueOf(GetLogReplyProto.newBuilder() http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/48d6a2a4/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStream.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStream.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStream.java index fea213e..015b90c 100644 --- a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStream.java +++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStream.java @@ -17,13 +17,13 @@ */ package org.apache.ratis.logservice.api; -import java.io.IOException; +import java.util.Collection; import java.util.Set; /** * A distributed log with "infinite" length that supports reads and writes. */ -public interface LogStream { +public interface LogStream extends AutoCloseable{ /** * An enumeration that defines the current state of a LogStream @@ -70,10 +70,30 @@ public interface LogStream { /** * Returns all {@link RecordListeners} for this LogStream. */ - Set<RecordListener> getRecordListeners(); + Collection<RecordListener> getRecordListeners(); /** * Returns a copy of the Configuration for this LogStream. */ - LogStreamConfiguration getConfiguration(); + LogServiceConfiguration getConfiguration(); + + /** + * Add new log record listener + * @param listener listener + */ + void addRecordListener(RecordListener listener); + + + /** + * Remove record listener + * @param listener listener + * @return true, if successful, false - otherwise + */ + boolean removeRecordListener (RecordListener listener); + + /** + * Get log service + * @return log service instance + */ + LogService getLogService(); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/48d6a2a4/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStreamConfiguration.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStreamConfiguration.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStreamConfiguration.java deleted file mode 100644 index 12aa030..0000000 --- a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStreamConfiguration.java +++ /dev/null @@ -1,64 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ratis.logservice.api; - -import java.util.Map; -import java.util.Map.Entry; - -/** - * An encapsulation of configuration for a LogStream. - */ -public interface LogStreamConfiguration { - - /** - * Fetches the value for the given key from the configuration. If there is no entry for - * the given key, {@code null} is returned. - * - * @param key The configuration key - */ - String get(String key); - - /** - * Sets the given key and value into this configuration. The configuration key may - * not be null. A null value removes the key from the configuration. - * - * @param key Configuration key, must be non-null - * @param value Configuration value - */ - void set(String key, String value); - - /** - * Removes any entry with the given key from the configuration. If there is no entry - * for the given key, this method returns without error. The provided key must be - * non-null. - * - * @param key The configuration key, must be non-null - */ - void remove(String key); - - /** - * Sets the collection of key-value pairs into the configuration. This is functionally - * equivalent to calling {@link #set(String, String)} numerous time. - */ - void setMany(Iterable<Entry<String,String>> entries); - - /** - * Returns an immutable view over the configuration as a {@code Map}. - */ - Map<String,String> asMap(); -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/48d6a2a4/ratis-logservice/src/main/java/org/apache/ratis/logservice/dummy/DummyLogService.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/dummy/DummyLogService.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/dummy/DummyLogService.java index 691556a..4393e70 100644 --- a/ratis-logservice/src/main/java/org/apache/ratis/logservice/dummy/DummyLogService.java +++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/dummy/DummyLogService.java @@ -24,11 +24,12 @@ import java.util.Iterator; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import org.apache.ratis.client.RaftClient; import org.apache.ratis.logservice.api.LogName; import org.apache.ratis.logservice.api.LogService; import org.apache.ratis.logservice.api.LogStream; import org.apache.ratis.logservice.api.LogStream.State; -import org.apache.ratis.logservice.api.LogStreamConfiguration; +import org.apache.ratis.logservice.api.LogServiceConfiguration; import org.apache.ratis.logservice.api.RecordListener; public class DummyLogService implements LogService { @@ -40,7 +41,7 @@ public class DummyLogService implements LogService { } @Override - public LogStream createLog(LogName name, LogStreamConfiguration config) { + public LogStream createLog(LogName name, LogServiceConfiguration config) { return new DummyLogStream(this, name); } @@ -65,7 +66,7 @@ public class DummyLogService implements LogService { @Override public void deleteLog(LogName name) {} - @Override public void updateConfiguration(LogName name, LogStreamConfiguration config) {} + @Override public void updateConfiguration(LogName name, LogServiceConfiguration config) {} @Override public void addRecordListener(LogName name, RecordListener listener) { recordListeners.compute(name, (key, currentValue) -> { @@ -77,16 +78,29 @@ public class DummyLogService implements LogService { }); } - @Override public void removeRecordListener(LogName name, RecordListener listener) { - recordListeners.compute(name, (key, currentValue) -> { + @Override public boolean removeRecordListener(LogName name, RecordListener listener) { + Set<RecordListener> result = recordListeners.compute(name, (key, currentValue) -> { if (currentValue == null) { return null; } currentValue.remove(listener); return currentValue; }); + return result.size() > 0; } @Override public void close() throws IOException {} + @Override + public RaftClient getRaftClient() { + // TODO Auto-generated method stub + return null; + } + + @Override + public LogServiceConfiguration getConfiguration() { + // TODO Auto-generated method stub + return null; + } + } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/48d6a2a4/ratis-logservice/src/main/java/org/apache/ratis/logservice/dummy/DummyLogStream.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/dummy/DummyLogStream.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/dummy/DummyLogStream.java index c52a160..a931951 100644 --- a/ratis-logservice/src/main/java/org/apache/ratis/logservice/dummy/DummyLogStream.java +++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/dummy/DummyLogStream.java @@ -23,8 +23,9 @@ import java.util.Set; import org.apache.ratis.logservice.api.LogName; import org.apache.ratis.logservice.api.LogReader; +import org.apache.ratis.logservice.api.LogService; import org.apache.ratis.logservice.api.LogStream; -import org.apache.ratis.logservice.api.LogStreamConfiguration; +import org.apache.ratis.logservice.api.LogServiceConfiguration; import org.apache.ratis.logservice.api.LogWriter; import org.apache.ratis.logservice.api.RecordListener; @@ -77,7 +78,31 @@ public class DummyLogStream implements LogStream { } @Override - public LogStreamConfiguration getConfiguration() { + public LogServiceConfiguration getConfiguration() { + return null; + } + + @Override + public void close() throws Exception { + // TODO Auto-generated method stub + + } + + @Override + public void addRecordListener(RecordListener listener) { + // TODO Auto-generated method stub + + } + + @Override + public boolean removeRecordListener(RecordListener listener) { + // TODO Auto-generated method stub + return false; + } + + @Override + public LogService getLogService() { + // TODO Auto-generated method stub return null; } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/48d6a2a4/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/BaseLogStream.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/BaseLogStream.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/BaseLogStream.java deleted file mode 100644 index 4fe830e..0000000 --- a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/BaseLogStream.java +++ /dev/null @@ -1,85 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ratis.logservice.impl; - -import java.util.Set; - -import org.apache.ratis.logservice.api.LogName; -import org.apache.ratis.logservice.api.LogReader; -import org.apache.ratis.logservice.api.LogStream; -import org.apache.ratis.logservice.api.LogStreamConfiguration; -import org.apache.ratis.logservice.api.LogWriter; -import org.apache.ratis.logservice.api.RecordListener; - -public class BaseLogStream implements LogStream { - private LogName logName; - private State state; - private long size; - - public BaseLogStream(LogName logName, State state, long size) { - this.logName = logName; - this.state = state; - this.size = size; - } - - @Override - public LogName getName() { - return logName; - } - - @Override - public State getState() { - return state; - } - - @Override - public long getSize() { - return size; - } - - @Override - public LogReader createReader() { - // TODO Auto-generated method stub - return null; - } - - @Override - public LogWriter createWriter() { - // TODO Auto-generated method stub - return null; - } - - @Override - public long getLastRecordId() { - // TODO Auto-generated method stub - return 0; - } - - @Override - public Set<RecordListener> getRecordListeners() { - // TODO Auto-generated method stub - return null; - } - - @Override - public LogStreamConfiguration getConfiguration() { - // TODO Auto-generated method stub - return null; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/48d6a2a4/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogReaderImpl.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogReaderImpl.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogReaderImpl.java new file mode 100644 index 0000000..ae4f2de --- /dev/null +++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogReaderImpl.java @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.logservice.impl; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.logservice.api.LogReader; +import org.apache.ratis.logservice.api.LogServiceConfiguration; +import org.apache.ratis.logservice.api.LogStream; +import org.apache.ratis.logservice.util.LogServiceProtoUtil; +import org.apache.ratis.proto.logservice.LogServiceProtos.LogServiceException; +import org.apache.ratis.proto.logservice.LogServiceProtos.ReadLogReplyProto; +import org.apache.ratis.protocol.Message; +import org.apache.ratis.protocol.RaftClientReply; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LogReaderImpl implements LogReader { + public static final Logger LOG = LoggerFactory.getLogger(LogReaderImpl.class); + + /* + * Parent log stream + */ + private LogStream parent; + /* + * Raft client + */ + private RaftClient raftClient; + /* + * Log service configuration object + */ + private LogServiceConfiguration config; + + /* + * offset + */ + long currentRecordId; + + public LogReaderImpl(LogStream logStream) { + this.parent = logStream; + this.raftClient = logStream.getLogService().getRaftClient(); + this.config = logStream.getConfiguration(); + } + + @Override + public void seek(long recordId) throws IOException { + this.currentRecordId = recordId; + } + + @Override + public ByteBuffer readNext() throws IOException { + int num = 1; + RaftClientReply reply = + raftClient.sendReadOnly(Message.valueOf(LogServiceProtoUtil + .toReadLogRequestProto(parent.getName(), currentRecordId, num) + .toByteString())); + ReadLogReplyProto proto = ReadLogReplyProto.parseFrom(reply.getMessage().getContent()); + if (proto.hasException()) { + LogServiceException e = proto.getException(); + throw new IOException(e.getErrorMsg()); + } + proto.getLogRecord(0); + currentRecordId++; + return ByteBuffer.wrap(proto.getLogRecord(0).toByteArray()); + } + + @Override + public void readNext(ByteBuffer buffer) throws IOException { + int num = 1; + RaftClientReply reply = + raftClient.sendReadOnly(Message.valueOf(LogServiceProtoUtil + .toReadLogRequestProto(parent.getName(), currentRecordId, num) + .toByteString())); + ReadLogReplyProto proto = ReadLogReplyProto.parseFrom(reply.getMessage().getContent()); + if (proto.hasException()) { + LogServiceException e = proto.getException(); + throw new IOException(e.getErrorMsg()); + } + currentRecordId++; + //TODO limits + buffer.put(proto.getLogRecord(0).toByteArray()); + } + + @Override + public List<ByteBuffer> readBulk(int numRecords) throws IOException { + RaftClientReply reply = + raftClient.sendReadOnly(Message.valueOf(LogServiceProtoUtil + .toReadLogRequestProto(parent.getName(), currentRecordId, numRecords) + .toByteString())); + ReadLogReplyProto proto = ReadLogReplyProto.parseFrom(reply.getMessage().getContent()); + if (proto.hasException()) { + LogServiceException e = proto.getException(); + throw new IOException(e.getErrorMsg()); + } + //TODO correct current record + currentRecordId+= numRecords; + List<ByteBuffer> ret = new ArrayList<ByteBuffer>(); + int n = proto.getLogRecordCount(); + for(int i=0; i < n; i++) { + ret.add(ByteBuffer.wrap(proto.getLogRecord(i).toByteArray())); + } + return ret; + } + + @Override + public int readBulk(List<ByteBuffer> buffers) throws IOException { + RaftClientReply reply = + raftClient.sendReadOnly(Message.valueOf(LogServiceProtoUtil + .toReadLogRequestProto(parent.getName(), currentRecordId, buffers.size()) + .toByteString())); + ReadLogReplyProto proto = ReadLogReplyProto.parseFrom(reply.getMessage().getContent()); + if (proto.hasException()) { + LogServiceException e = proto.getException(); + throw new IOException(e.getErrorMsg()); + } + //TODO correct current record + int n = proto.getLogRecordCount(); + currentRecordId += n; + for(int i=0; i< n; i++) { + buffers.get(i).put(proto.getLogRecord(i).toByteArray()); + } + return n; + } + + @Override + public long getPosition() { + return currentRecordId; + } + + @Override + public void close() throws IOException { + // TODO Auto-generated method stub + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/48d6a2a4/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogServiceImpl.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogServiceImpl.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogServiceImpl.java index 407e498..681a4ab 100644 --- a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogServiceImpl.java +++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogServiceImpl.java @@ -22,12 +22,11 @@ import java.util.Iterator; import java.util.List; import org.apache.ratis.client.RaftClient; -import org.apache.ratis.client.impl.ClientProtoUtils; import org.apache.ratis.logservice.api.LogName; import org.apache.ratis.logservice.api.LogService; +import org.apache.ratis.logservice.api.LogServiceConfiguration; import org.apache.ratis.logservice.api.LogStream; import org.apache.ratis.logservice.api.LogStream.State; -import org.apache.ratis.logservice.api.LogStreamConfiguration; import org.apache.ratis.logservice.api.RecordListener; import org.apache.ratis.logservice.util.LogServiceProtoUtil; import org.apache.ratis.proto.logservice.LogServiceProtos.ArchiveLogReplyProto; @@ -45,9 +44,11 @@ import org.apache.ratis.protocol.RaftClientReply; public class LogServiceImpl implements LogService { final private RaftClient raftClient; + final private LogServiceConfiguration config; - public LogServiceImpl(RaftClient raftClient) { + public LogServiceImpl(RaftClient raftClient, LogServiceConfiguration config) { this.raftClient = raftClient; + this.config = config; } @Override @@ -56,32 +57,28 @@ public class LogServiceImpl implements LogService { raftClient.send(Message.valueOf(LogServiceProtoUtil.toCreateLogRequestProto(name) .toByteString())); CreateLogReplyProto parseFrom = CreateLogReplyProto.parseFrom(reply.getMessage().getContent()); - return LogServiceProtoUtil.toLogStream(parseFrom.getLogStream()); + return LogServiceProtoUtil.toLogStream(parseFrom.getLogStream(), this); } - @Override - public LogStream createLog(LogName name, LogStreamConfiguration config) throws IOException { - // TODO need to make changes in the create log request to pass config. - return createLog(name); - } + @Override public LogStream getLog(LogName name) throws IOException { RaftClientReply reply = - raftClient.send(Message.valueOf(LogServiceProtoUtil.toGetLogRequestProto(name) + raftClient.sendReadOnly(Message.valueOf(LogServiceProtoUtil.toGetLogRequestProto(name) .toByteString())); GetLogReplyProto parseFrom = GetLogReplyProto.parseFrom(reply.getMessage().getContent()); - return LogServiceProtoUtil.toLogStream(parseFrom.getLogStream()); + return LogServiceProtoUtil.toLogStream(parseFrom.getLogStream(), this); } @Override public Iterator<LogStream> listLogs() throws IOException { RaftClientReply reply = raftClient - .send(Message.valueOf(LogServiceProtoUtil.toListLogRequestProto().toByteString())); + .sendReadOnly(Message.valueOf(LogServiceProtoUtil.toListLogRequestProto().toByteString())); ListLogsReplyProto parseFrom = ListLogsReplyProto.parseFrom(reply.getMessage().getContent()); List<LogStreamProto> logStremsList = parseFrom.getLogStremsList(); - return LogServiceProtoUtil.toListLogStreams(logStremsList).iterator(); + return LogServiceProtoUtil.toListLogStreams(logStremsList, this).iterator(); } @Override @@ -95,7 +92,7 @@ public class LogServiceImpl implements LogService { @Override public State getState(LogName name) throws IOException { RaftClientReply reply = - raftClient.send(Message.valueOf(LogServiceProtoUtil.toGetStateRequestProto(name) + raftClient.sendReadOnly(Message.valueOf(LogServiceProtoUtil.toGetStateRequestProto(name) .toByteString())); GetStateReplyProto parseFrom = GetStateReplyProto.parseFrom(reply.getMessage().getContent()); return parseFrom.getState() == LogStreamState.OPEN ? State.OPEN : State.CLOSED; @@ -118,28 +115,50 @@ public class LogServiceImpl implements LogService { DeleteLogReplyProto parseFrom = DeleteLogReplyProto.parseFrom(reply.getMessage().getContent()); } + @Override - public void updateConfiguration(LogName name, LogStreamConfiguration config) { + public void addRecordListener(LogName name, RecordListener listener) { // TODO Auto-generated method stub } @Override - public void addRecordListener(LogName name, RecordListener listener) { + public boolean removeRecordListener(LogName name, RecordListener listener) { // TODO Auto-generated method stub - + return false; } @Override - public void removeRecordListener(LogName name, RecordListener listener) { + public void close() throws IOException { // TODO Auto-generated method stub } @Override - public void close() throws IOException { + public LogStream createLog(LogName name, LogServiceConfiguration config) throws IOException { + //TODO configuration + RaftClientReply reply = + raftClient.send(Message.valueOf(LogServiceProtoUtil.toCreateLogRequestProto(name) + .toByteString())); + CreateLogReplyProto parseFrom = CreateLogReplyProto.parseFrom(reply.getMessage().getContent()); + return LogServiceProtoUtil.toLogStream(parseFrom.getLogStream(), this); + } + + @Override + public void updateConfiguration(LogName name, LogServiceConfiguration config) { // TODO Auto-generated method stub } + + @Override + public RaftClient getRaftClient() { + return raftClient; + } + + @Override + public LogServiceConfiguration getConfiguration() { + return config; + } + } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/48d6a2a4/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogStreamImpl.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogStreamImpl.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogStreamImpl.java new file mode 100644 index 0000000..06a28f5 --- /dev/null +++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogStreamImpl.java @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.logservice.impl; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import org.apache.ratis.logservice.api.LogName; +import org.apache.ratis.logservice.api.LogReader; +import org.apache.ratis.logservice.api.LogService; +import org.apache.ratis.logservice.api.LogStream; +import org.apache.ratis.logservice.api.LogServiceConfiguration; +import org.apache.ratis.logservice.api.LogWriter; +import org.apache.ratis.logservice.api.RecordListener; +import org.apache.ratis.proto.logservice.LogServiceProtos.LogStreamProto; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LogStreamImpl implements LogStream { + public static final Logger LOG = LoggerFactory.getLogger(LogStreamImpl.class); + + /* + * Log stream listeners + */ + List<RecordListener> listeners; + /* + * Log stream name + */ + LogName name; + /* + * Parent log service instance + */ + LogService service; + /* + * Log stream configuration + */ + LogServiceConfiguration config; + /* + * State + */ + LogStream.State state; + + /* + * Length + */ + long length; + + public LogStreamImpl(LogStreamProto proto, LogService service) { + this.service = service; + this.name = LogName.of(proto.getLogName().getName()); + this.config = service.getConfiguration(); + init(); + } + + public LogStreamImpl(LogName name, LogService logService) { + this.service = logService; + this.name = name; + this.config = this.service.getConfiguration(); + init(); + } + + public LogStreamImpl(LogName name, LogService logService, LogServiceConfiguration config) { + this.service = logService; + this.name = name; + this.config = config; + init(); + } + + private void init() { + // TODO create new state machine. etc + state = State.OPEN; + listeners = Collections.synchronizedList(new ArrayList<RecordListener>()); + } + + @Override + public LogName getName() { + return name; + } + + @Override + public State getState() { + return state; + } + + @Override + public long getSize() { + // TODO use raft client to query state machine + return 0; + } + + @Override + public LogReader createReader() { + return new LogReaderImpl(this); + } + + @Override + public LogWriter createWriter() { + return new LogWriterImpl(this); + } + + @Override + public long getLastRecordId() { + // TODO use raft client to query state machine + return 0; + } + + @Override + public Collection<RecordListener> getRecordListeners() { + return listeners; + } + + @Override + public LogServiceConfiguration getConfiguration() { + return config; + } + + @Override + public void close() throws Exception { + // TODO Auto-generated method stub + state = State.CLOSED; + } + + @Override + public void addRecordListener(RecordListener listener) { + synchronized (listeners) { + if (!listeners.contains(listener)) { + listeners.add(listener); + } + } + } + + @Override + public boolean removeRecordListener(RecordListener listener) { + return listeners.remove(listener); + } + + @Override + public LogService getLogService() { + return service; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/48d6a2a4/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogWriterImpl.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogWriterImpl.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogWriterImpl.java new file mode 100644 index 0000000..92082ab --- /dev/null +++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogWriterImpl.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.logservice.impl; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.logservice.api.LogServiceConfiguration; +import org.apache.ratis.logservice.api.LogStream; +import org.apache.ratis.logservice.api.LogWriter; +import org.apache.ratis.logservice.util.LogServiceProtoUtil; +import org.apache.ratis.proto.logservice.LogServiceProtos.AppendLogEntryReplyProto; +import org.apache.ratis.proto.logservice.LogServiceProtos.LogServiceException; +import org.apache.ratis.proto.logservice.LogServiceProtos.SyncLogReplyProto; +import org.apache.ratis.protocol.Message; +import org.apache.ratis.protocol.RaftClientReply; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LogWriterImpl implements LogWriter { + public static final Logger LOG = LoggerFactory.getLogger(LogWriterImpl.class); + + /* + * Parent log stream + */ + private LogStream parent; + /* + * Raft client + */ + private RaftClient raftClient; + /* + * Log service configuration object + */ + private LogServiceConfiguration config; + + public LogWriterImpl(LogStream logStream) { + this.parent = logStream; + this.raftClient = logStream.getLogService().getRaftClient(); + this.config = logStream.getConfiguration(); + } + + @Override + public long write(ByteBuffer data) throws IOException { + List<ByteBuffer> list = new ArrayList<ByteBuffer>(); + list.add(data); + RaftClientReply reply = + raftClient.send(Message.valueOf(LogServiceProtoUtil + .toAppendBBEntryLogRequestProto(parent.getName(), list) + .toByteString())); + AppendLogEntryReplyProto proto = AppendLogEntryReplyProto.parseFrom(reply.getMessage().getContent()); + if (proto.hasException()) { + LogServiceException e = proto.getException(); + throw new IOException(e.getErrorMsg()); + } + //TODO current record id + return 0; + } + + @Override + public long sync() throws IOException { + RaftClientReply reply = + raftClient.send(Message.valueOf(LogServiceProtoUtil + .toSyncLogRequestProto(parent.getName()) + .toByteString())); + SyncLogReplyProto proto = SyncLogReplyProto.parseFrom(reply.getMessage().getContent()); + if (proto.hasException()) { + LogServiceException e = proto.getException(); + throw new IOException(e.getErrorMsg()); + } + //TODO current record id + return 0; + } + + @Override + public void close() throws IOException { + //TODO + } + +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/48d6a2a4/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceProtoUtil.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceProtoUtil.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceProtoUtil.java index 998dc17..2e1b8da 100644 --- a/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceProtoUtil.java +++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceProtoUtil.java @@ -1,39 +1,63 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable - * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" - * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License - * for the specific language governing permissions and limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.ratis.logservice.util; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; -import org.apache.ratis.logservice.api.LogMessage; import org.apache.ratis.logservice.api.LogName; +import org.apache.ratis.logservice.api.LogService; import org.apache.ratis.logservice.api.LogStream; import org.apache.ratis.logservice.api.LogStream.State; -import org.apache.ratis.logservice.impl.BaseLogStream; +import org.apache.ratis.logservice.impl.LogStreamImpl; +import org.apache.ratis.proto.logservice.LogServiceProtos.AppendLogEntryReplyProto; +import org.apache.ratis.proto.logservice.LogServiceProtos.AppendLogEntryRequestProto; +import org.apache.ratis.proto.logservice.LogServiceProtos.ArchiveLogReplyProto; import org.apache.ratis.proto.logservice.LogServiceProtos.ArchiveLogRequestProto; +import org.apache.ratis.proto.logservice.LogServiceProtos.CloseLogReplyProto; import org.apache.ratis.proto.logservice.LogServiceProtos.CloseLogRequestProto; import org.apache.ratis.proto.logservice.LogServiceProtos.CreateLogReplyProto; import org.apache.ratis.proto.logservice.LogServiceProtos.CreateLogRequestProto; +import org.apache.ratis.proto.logservice.LogServiceProtos.DeleteLogReplyProto; import org.apache.ratis.proto.logservice.LogServiceProtos.DeleteLogRequestProto; +import org.apache.ratis.proto.logservice.LogServiceProtos.GetLogLengthReplyProto; +import org.apache.ratis.proto.logservice.LogServiceProtos.GetLogLengthRequestProto; import org.apache.ratis.proto.logservice.LogServiceProtos.GetLogReplyProto; import org.apache.ratis.proto.logservice.LogServiceProtos.GetLogRequestProto; +import org.apache.ratis.proto.logservice.LogServiceProtos.GetLogStartIndexReplyProto; +import org.apache.ratis.proto.logservice.LogServiceProtos.GetLogStartIndexRequestProto; import org.apache.ratis.proto.logservice.LogServiceProtos.GetStateReplyProto; import org.apache.ratis.proto.logservice.LogServiceProtos.GetStateRequestProto; import org.apache.ratis.proto.logservice.LogServiceProtos.ListLogsReplyProto; import org.apache.ratis.proto.logservice.LogServiceProtos.ListLogsReplyProto.Builder; import org.apache.ratis.proto.logservice.LogServiceProtos.ListLogsRequestProto; import org.apache.ratis.proto.logservice.LogServiceProtos.LogNameProto; +import org.apache.ratis.proto.logservice.LogServiceProtos.LogServiceException; import org.apache.ratis.proto.logservice.LogServiceProtos.LogServiceRequestProto; import org.apache.ratis.proto.logservice.LogServiceProtos.LogStreamProto; import org.apache.ratis.proto.logservice.LogServiceProtos.LogStreamState; +import org.apache.ratis.proto.logservice.LogServiceProtos.ReadLogReplyProto; +import org.apache.ratis.proto.logservice.LogServiceProtos.ReadLogRequestProto; +import org.apache.ratis.proto.logservice.LogServiceProtos.SyncLogReplyProto; +import org.apache.ratis.proto.logservice.LogServiceProtos.SyncLogRequestProto; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; /** * Licensed to the Apache Software Foundation (ASF) under one @@ -78,6 +102,11 @@ public class LogServiceProtoUtil { return LogServiceRequestProto.newBuilder().setCloseLog(closeLog).build(); } + public static CloseLogReplyProto toCloseLogReplyProto() { + CloseLogReplyProto.Builder builder = CloseLogReplyProto.newBuilder(); + return builder.build(); + } + public static LogServiceRequestProto toGetStateRequestProto(LogName logName) { LogNameProto logNameProto = LogNameProto.newBuilder().setName(logName.getName()).build(); GetStateRequestProto getState = @@ -99,15 +128,9 @@ public class LogServiceProtoUtil { return LogServiceRequestProto.newBuilder().setDeleteLog(deleteLog).build(); } - public static LogMessage toLogMessage( - org.apache.ratis.proto.logservice.LogServiceProtos.LogMessage message) { - if (!message.getData().isEmpty()) { - return new LogMessage(LogName.of(message.getLogName()), message.getData().toByteArray()); - } else if (message.getLength() != 0) { - return new LogMessage(LogName.of(message.getLogName()), message.getLength()); - } else { - return new LogMessage(LogName.of(message.getLogName())); - } + public static DeleteLogReplyProto toDeleteLogReplyProto() { + DeleteLogReplyProto.Builder builder = DeleteLogReplyProto.newBuilder(); + return builder.build(); } public static LogNameProto toLogNameProto(LogName logName) { @@ -132,10 +155,8 @@ public class LogServiceProtoUtil { return logStreamProto; } - public static LogStream toLogStream(LogStreamProto logStream) { - return new BaseLogStream(toLogName(logStream.getLogName()), - (logStream.getState() == LogStreamState.OPEN ? State.OPEN : State.CLOSED), - logStream.getSize()); + public static LogStream toLogStream(LogStreamProto logStream, LogService parent) { + return new LogStreamImpl(logStream, parent); } public static CreateLogReplyProto toCreateLogReplyProto(LogStream logStream) { @@ -160,14 +181,86 @@ public class LogServiceProtoUtil { return newBuilder.build(); } - public static List<LogStream> toListLogStreams(List<LogStreamProto> logStreamProtos) { + public static ArchiveLogReplyProto toArchiveLogReplyProto() { + ArchiveLogReplyProto.Builder builder = ArchiveLogReplyProto.newBuilder(); + return builder.build(); + } + + public static LogServiceRequestProto toGetLengthRequestProto(LogName name) { + LogNameProto logNameProto = + LogNameProto.newBuilder().setName(name.getName()).build(); + GetLogLengthRequestProto.Builder builder = GetLogLengthRequestProto.newBuilder(); + builder.setLogName(logNameProto); + return LogServiceRequestProto.newBuilder().setLengthQuery(builder.build()).build(); + } + + public static LogServiceRequestProto toGetStartIndexProto(LogName name) { + LogNameProto logNameProto = + LogNameProto.newBuilder().setName(name.getName()).build(); + GetLogStartIndexRequestProto.Builder builder = GetLogStartIndexRequestProto.newBuilder(); + builder.setLogName(logNameProto); + return LogServiceRequestProto.newBuilder().setStartIndexQuery(builder.build()).build(); + } + + public static LogServiceRequestProto toReadLogRequestProto(LogName name, long start, int total) { + LogNameProto logNameProto = + LogNameProto.newBuilder().setName(name.getName()).build(); + ReadLogRequestProto.Builder builder = ReadLogRequestProto.newBuilder(); + builder.setLogName(logNameProto); + builder.setStartRecordId(start); + builder.setNumRecords(total); + return LogServiceRequestProto.newBuilder().setReadNextQuery(builder.build()).build(); + } + + public static LogServiceRequestProto toSyncLogRequestProto(LogName name) { + LogNameProto logNameProto = + LogNameProto.newBuilder().setName(name.getName()).build(); + SyncLogRequestProto.Builder builder = SyncLogRequestProto.newBuilder(); + builder.setLogName(logNameProto); + return LogServiceRequestProto.newBuilder().setSyncRequest(builder.build()).build(); + } + + public static LogServiceRequestProto toAppendEntryLogRequestProto(LogName name, + List<byte[]> entries) { + LogNameProto logNameProto = + LogNameProto.newBuilder().setName(name.getName()).build(); + AppendLogEntryRequestProto.Builder builder = AppendLogEntryRequestProto.newBuilder(); + builder.setLogName(logNameProto); + for (int i=0; i < entries.size(); i++) { + builder.addData(ByteString.copyFrom(entries.get(i))); + } + return LogServiceRequestProto.newBuilder().setAppendRequest(builder.build()).build(); + } + + public static LogServiceRequestProto toAppendBBEntryLogRequestProto(LogName name, + List<ByteBuffer> entries) { + LogNameProto logNameProto = + LogNameProto.newBuilder().setName(name.getName()).build(); + AppendLogEntryRequestProto.Builder builder = AppendLogEntryRequestProto.newBuilder(); + builder.setLogName(logNameProto); + for (int i=0; i < entries.size(); i++) { + builder.addData(ByteString.copyFrom(entries.get(i))); + } + return LogServiceRequestProto.newBuilder().setAppendRequest(builder.build()).build(); + } + + public static List<LogStream> toListLogStreams(List<LogStreamProto> logStreamProtos, + LogService parent) { List<LogStream> logStreams = new ArrayList<>(logStreamProtos.size()); for (LogStreamProto proto : logStreamProtos) { - logStreams.add(toLogStream(proto)); + logStreams.add(toLogStream(proto, parent)); } return logStreams; } + public static List<byte[]> toListByteArray(List<ByteString> list) { + List<byte[]> retVal = new ArrayList<byte[]>(list.size()); + for(int i=0; i < list.size(); i++) { + retVal.add(list.get(i).toByteArray()); + } + return retVal; + } + public static GetLogReplyProto toGetLogReplyProto(LogStream logStream) { return GetLogReplyProto.newBuilder().setLogStream(toLogStreamProto(logStream)).build(); } @@ -176,4 +269,79 @@ public class LogServiceProtoUtil { return GetStateReplyProto.newBuilder() .setState(exists ? LogStreamState.OPEN : LogStreamState.CLOSED).build(); } + + + public static GetLogLengthReplyProto toGetLogLengthReplyProto(long length, Throwable t) { + GetLogLengthReplyProto.Builder builder = GetLogLengthReplyProto.newBuilder(); + if (t != null) { + builder.setException(toLogException(t)); + } else { + builder.setLength(length); + } + return builder.build(); + } + + public static GetLogStartIndexReplyProto toGetLogStartIndexReplyProto(long length, Throwable t) { + GetLogStartIndexReplyProto.Builder builder = GetLogStartIndexReplyProto.newBuilder(); + if (t != null) { + builder.setException(toLogException(t)); + } else { + builder.setStartIndex(length); + } + return builder.build(); + } + + public static ReadLogReplyProto toReadLogReplyProto(List<byte[]> entries, Throwable t) { + ReadLogReplyProto.Builder builder = ReadLogReplyProto.newBuilder(); + if (t != null) { + builder.setException(toLogException(t)); + } else { + for(byte[] record: entries) { + builder.addLogRecord( ByteString.copyFrom(record)); + } + } + return builder.build(); + } + + public static AppendLogEntryReplyProto toAppendLogReplyProto(List<Long> ids, Throwable t) { + AppendLogEntryReplyProto.Builder builder = AppendLogEntryReplyProto.newBuilder(); + if (t!= null) { + builder.setException(toLogException(t)); + } else if (ids != null){ + int index = 0; + for(long id: ids) { + builder.setRecordId(index++, id); + } + } + return builder.build(); + } + + public static SyncLogReplyProto toSyncLogReplyProto(Throwable t) { + SyncLogReplyProto.Builder builder = SyncLogReplyProto.newBuilder(); + if (t != null) { + builder.setException(toLogException(t)); + } + return builder.build(); + } + + public GetLogLengthReplyProto toGetLogLengthReplyProto(long length) { + GetLogLengthReplyProto.Builder builder = GetLogLengthReplyProto.newBuilder(); + builder.setLength(length); + return builder.build(); + } + + public static LogServiceException toLogException(Throwable t) { + LogServiceException.Builder builder = LogServiceException.newBuilder(); + builder.setExceptionClassName(t.getClass().getName()); + builder.setErrorMsg(t.getMessage()); + StackTraceElement[] trace = t.getStackTrace(); + StringBuffer buf = new StringBuffer(); + for (StackTraceElement el: trace) { + buf.append(el.toString()).append("\n"); + } + String strace = buf.toString(); + builder.setStacktrace(ByteString.copyFrom(strace, Charset.defaultCharset())); + return builder.build(); + } + } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/48d6a2a4/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/Utils.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/Utils.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/Utils.java deleted file mode 100644 index 3709b6f..0000000 --- a/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/Utils.java +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ratis.logservice.util; - -public class Utils { - - - public static int long2bytes(long v, byte[] buf, int offset) { - int2bytes((int)(v >>> 32), buf, offset); - int2bytes((int) v , buf, offset + 4); - return 8; - } - - public static int bytes2int(byte[] buf, int offset) { - return (buf[offset] << 24) - + ((0xFF & buf[offset + 1]) << 16) - + ((0xFF & buf[offset + 2]) << 8) - + (0xFF & buf[offset + 3]); - } - - public static long bytes2long(byte[] buf, int offset) { - return ((long)bytes2int(buf, offset) << 32) - + (0xFFFFFFFFL & bytes2int(buf, offset + 4)); - } - - public static int int2bytes(int v, byte[] buf, int offset) { - buf[offset ] = (byte) (v >>> 24); - buf[offset + 1] = (byte) (v >>> 16); - buf[offset + 2] = (byte) (v >>> 8); - buf[offset + 3] = (byte) (v); - return 4; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/48d6a2a4/ratis-logservice/src/test/java/org/apache/ratis/logservice/LogServiceBaseTest.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/test/java/org/apache/ratis/logservice/LogServiceBaseTest.java b/ratis-logservice/src/test/java/org/apache/ratis/logservice/LogServiceBaseTest.java index ecdc905..8255351 100644 --- a/ratis-logservice/src/test/java/org/apache/ratis/logservice/LogServiceBaseTest.java +++ b/ratis-logservice/src/test/java/org/apache/ratis/logservice/LogServiceBaseTest.java @@ -28,6 +28,7 @@ import org.apache.ratis.client.RaftClient; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.logservice.api.LogName; import org.apache.ratis.logservice.api.LogService; +import org.apache.ratis.logservice.api.LogServiceConfiguration; import org.apache.ratis.logservice.api.LogStateMachine; import org.apache.ratis.logservice.api.LogStream; import org.apache.ratis.logservice.api.LogStream.State; @@ -47,6 +48,7 @@ public abstract class LogServiceBaseTest<CLUSTER extends MiniRaftCluster> final RaftProperties p = getProperties(); p.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, LogStateMachine.class, StateMachine.class); + LOG.info("Set LogStateMachine OK"); } static final int NUM_PEERS = 3; @@ -64,7 +66,8 @@ public abstract class LogServiceBaseTest<CLUSTER extends MiniRaftCluster> RaftClient raftClient = RaftClient.newBuilder().setProperties(getProperties()).setRaftGroup(cluster.getGroup()) .build(); - LogService logService = LogServiceFactory.getInstance().createLogService(raftClient); + LogService logService = LogServiceFactory.getInstance().createLogService(raftClient, + new LogServiceConfiguration()); LogName logName = LogName.of("log1"); LogStream logStream = logService.createLog(logName); assertEquals("log1", logStream.getName().getName()); @@ -81,7 +84,7 @@ public abstract class LogServiceBaseTest<CLUSTER extends MiniRaftCluster> State state = logService.getState(logName); assertEquals(State.OPEN, state); } - + @After public void tearDown() { cluster.shutdown(); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/48d6a2a4/ratis-logservice/src/test/java/org/apache/ratis/logservice/api/TestApiExample.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/test/java/org/apache/ratis/logservice/api/TestApiExample.java b/ratis-logservice/src/test/java/org/apache/ratis/logservice/api/TestApiExample.java index 9808b23..a0b38f6 100644 --- a/ratis-logservice/src/test/java/org/apache/ratis/logservice/api/TestApiExample.java +++ b/ratis-logservice/src/test/java/org/apache/ratis/logservice/api/TestApiExample.java @@ -19,12 +19,10 @@ package org.apache.ratis.logservice.api; import static org.junit.Assert.assertEquals; -import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.ExecutionException; import org.apache.ratis.logservice.dummy.DummyLogService; import org.junit.Test; @@ -39,7 +37,7 @@ public class TestApiExample { } @Test - public void test() throws IOException, InterruptedException, ExecutionException { + public void test() throws Exception { try (LogService svc = new DummyLogService()) { LogStream log1 = svc.createLog(LogName.of("log1")); // Write some data http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/48d6a2a4/ratis-logservice/src/test/java/org/apache/ratis/logservice/api/TestLogMessage.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/test/java/org/apache/ratis/logservice/api/TestLogMessage.java b/ratis-logservice/src/test/java/org/apache/ratis/logservice/api/TestLogMessage.java deleted file mode 100644 index d58bdda..0000000 --- a/ratis-logservice/src/test/java/org/apache/ratis/logservice/api/TestLogMessage.java +++ /dev/null @@ -1,55 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ratis.logservice.api; - -import static org.junit.Assert.assertEquals; - -import org.apache.ratis.logservice.api.LogMessage.Type; -import org.junit.Test; - -import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; -import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException; - -public class TestLogMessage { - - @Test - public void testLogMessages() throws InvalidProtocolBufferException { - - LogMessage msg = new LogMessage(LogName.of("testLog")); - assertEquals(Type.READ_REQUEST, msg.getType()); - ByteString content = msg.getContent(); - LogMessage other = LogMessage.parseFrom(content); - assertEquals(msg.toString(), other.toString()); - - msg = new LogMessage(LogName.of("testLog"), 100); - assertEquals(Type.READ_REPLY, msg.getType()); - content = msg.getContent(); - other = LogMessage.parseFrom(content); - assertEquals(msg.toString(), other.toString()); - - msg = new LogMessage(LogName.of("testLog"), new byte[] { 0, 0, 0 }); - assertEquals(Type.WRITE, msg.getType()); - content = msg.getContent(); - other = LogMessage.parseFrom(content); - assertEquals(msg.toString(), other.toString()); - assertEquals(msg.getData().length, other.getData().length); - - } - - -}
