Repository: incubator-ratis Updated Branches: refs/heads/master acd507e6e -> ef48512da
RATIS-272. LogService: Design ideal API. Contributed by Josh Elser Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/ef48512d Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/ef48512d Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/ef48512d Branch: refs/heads/master Commit: ef48512da368f7d52ffddd3a8f02890bbdff2b39 Parents: acd507e Author: Tsz Wo Nicholas Sze <[email protected]> Authored: Fri Sep 7 13:19:20 2018 -0700 Committer: Tsz Wo Nicholas Sze <[email protected]> Committed: Fri Sep 7 13:19:20 2018 -0700 ---------------------------------------------------------------------- pom.xml | 7 + ratis-logservice/pom.xml | 116 ++++++++++++++++ .../ratis/logservice/LogServiceFactory.java | 44 ++++++ .../apache/ratis/logservice/api/LogName.java | 71 ++++++++++ .../apache/ratis/logservice/api/LogReader.java | 88 ++++++++++++ .../apache/ratis/logservice/api/LogService.java | 139 +++++++++++++++++++ .../apache/ratis/logservice/api/LogStream.java | 79 +++++++++++ .../logservice/api/LogStreamConfiguration.java | 64 +++++++++ .../apache/ratis/logservice/api/LogWriter.java | 64 +++++++++ .../ratis/logservice/api/RecordListener.java | 33 +++++ .../ratis/logservice/dummy/DummyLogReader.java | 76 ++++++++++ .../ratis/logservice/dummy/DummyLogService.java | 92 ++++++++++++ .../ratis/logservice/dummy/DummyLogStream.java | 83 +++++++++++ .../ratis/logservice/dummy/DummyLogWriter.java | 51 +++++++ .../ratis/logservice/api/TestApiExample.java | 69 +++++++++ .../src/test/resources/log4j.properties | 18 +++ 16 files changed, 1094 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ef48512d/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index c3fb15a..133b0ea 100644 --- a/pom.xml +++ b/pom.xml @@ -52,6 +52,7 @@ <module>ratis-assembly</module> <module>ratis-examples</module> <module>ratis-replicated-map</module> + <module>ratis-logservice</module> </modules> <pluginRepositories> @@ -302,6 +303,12 @@ <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.ratis</groupId> + <artifactId>ratis-logservice</artifactId> + <version>${project.version}</version> + </dependency> + <!-- External dependencies --> <dependency> <groupId>org.slf4j</groupId> http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ef48512d/ratis-logservice/pom.xml ---------------------------------------------------------------------- diff --git a/ratis-logservice/pom.xml b/ratis-logservice/pom.xml new file mode 100644 index 0000000..bfe002e --- /dev/null +++ b/ratis-logservice/pom.xml @@ -0,0 +1,116 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. See accompanying LICENSE file. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <artifactId>ratis</artifactId> + <groupId>org.apache.ratis</groupId> + <version>0.3.0-SNAPSHOT</version> + </parent> + + <artifactId>ratis-logservice</artifactId> + <name>Apache Ratis LogService</name> + + <dependencies> + <!-- Ratis dependencies --> + <dependency> + <artifactId>ratis-proto-shaded</artifactId> + <groupId>org.apache.ratis</groupId> + </dependency> + <dependency> + <artifactId>ratis-common</artifactId> + <groupId>org.apache.ratis</groupId> + </dependency> + <dependency> + <artifactId>ratis-client</artifactId> + <groupId>org.apache.ratis</groupId> + </dependency> + <dependency> + <artifactId>ratis-server</artifactId> + <groupId>org.apache.ratis</groupId> + </dependency> + <dependency> + <artifactId>ratis-grpc</artifactId> + <groupId>org.apache.ratis</groupId> + </dependency> + <dependency> + <artifactId>ratis-netty</artifactId> + <groupId>org.apache.ratis</groupId> + </dependency> + <!-- Third-party dependencies --> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + <!-- Test Ratis dependencies --> + <dependency> + <artifactId>ratis-netty</artifactId> + <groupId>org.apache.ratis</groupId> + <scope>test</scope> + <type>test-jar</type> + </dependency> + <dependency> + <artifactId>ratis-grpc</artifactId> + <groupId>org.apache.ratis</groupId> + <scope>test</scope> + <type>test-jar</type> + </dependency> + <dependency> + <artifactId>ratis-server</artifactId> + <groupId>org.apache.ratis</groupId> + <scope>test</scope> + <type>test-jar</type> + </dependency> + <dependency> + <artifactId>ratis-common</artifactId> + <groupId>org.apache.ratis</groupId> + <scope>test</scope> + <type>test-jar</type> + </dependency> + <dependency> + <artifactId>ratis-client</artifactId> + <groupId>org.apache.ratis</groupId> + <scope>test</scope> + <type>test-jar</type> + </dependency> + <dependency> + <artifactId>ratis-hadoop</artifactId> + <groupId>org.apache.ratis</groupId> + <scope>test</scope> + </dependency> + <dependency> + <artifactId>ratis-hadoop</artifactId> + <groupId>org.apache.ratis</groupId> + <scope>test</scope> + <type>test-jar</type> + </dependency> + <dependency> + <artifactId>ratis-hadoop-shaded</artifactId> + <groupId>org.apache.ratis</groupId> + <scope>test</scope> + </dependency> + <!-- Test third-party dependencies --> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <scope>test</scope> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ef48512d/ratis-logservice/src/main/java/org/apache/ratis/logservice/LogServiceFactory.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/LogServiceFactory.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/LogServiceFactory.java new file mode 100644 index 0000000..4ef801e --- /dev/null +++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/LogServiceFactory.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.logservice; + +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.logservice.api.LogService; + +public class LogServiceFactory { + private static final LogServiceFactory INSTANCE = new LogServiceFactory(); + + private LogServiceFactory() {} + + /** + * Creates an implementation of {@link LogService} using the given {@link RaftClient}. + * + * @param raftClient The client to a Raft quorum. + */ + public LogService createLogService(RaftClient raftClient) { + //TODO return new LogServiceImpl(); + return null; + } + + /** + * Returns an instance of the factory to create {@link LogService} instances. + */ + public static LogServiceFactory getInstance() { + return INSTANCE; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ef48512d/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogName.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogName.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogName.java new file mode 100644 index 0000000..3227845 --- /dev/null +++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogName.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.logservice.api; + +import static java.util.Objects.requireNonNull; + +import java.util.Objects; + +/** + * Identifier to uniquely identify a {@link LogStream}. + */ +public class LogName { + // It's pretty likely that what uniquely defines a LogStream + // to change over time. We should account for this by making an + // API which can naturally evolve. + private final String name; + + private LogName(String name) { + this.name = requireNonNull(name); + } + + /** + * Returns the unique name which identifies a LogStream. + * + * Impl Note: This class uses a String to uniquely identify this LogName (and the corresponding LogStream) + * from others. This is purely an implementation detail; the intent is that any data should be capable + * of identifying one LogStream/LogName from another. Users need only know how to construct a {@link LogName} + * and then use that in their application. + */ + String getName() { + return name; + } + + @Override public boolean equals(Object o) { + if (!(o instanceof LogName)) { + return false; + } + return Objects.equals(name, ((LogName) o).getName()); + } + + @Override public int hashCode() { + return name.hashCode(); + } + + @Override public String toString() { + return "LogName['" + name + "']"; + } + + /** + * Creates a {@link LogName} given the provided string. + */ + public static LogName of(String name) { + // TODO Limit allowed characters in the name? + return new LogName(name); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ef48512d/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogReader.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogReader.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogReader.java new file mode 100644 index 0000000..6db3b6e --- /dev/null +++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogReader.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.logservice.api; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; + +/** + * Synchronous client interface to read from a LogStream. + */ +public interface LogReader extends AutoCloseable { + + /** + * Seeks to the position before the record at the provided {@code recordId} in the LogStream. + * + * @param offset A non-negative, offset in the LogStream + * @return A future for when the operation is completed. + */ + void seek(long recordId) throws IOException; + + /** + * Reads the next record from the LogStream at the current position and advances the current position + * to after the record which was just returned. + * + * @return The data for the next record. + */ + ByteBuffer readNext() throws IOException; + + /** + * Reads the next record from the LogStream at the current position into the provided {@link buffer} and + * advances the current position to the point after the record just read. + * + * The provided buffer must be capable of holding one complete record from the Log. If the provided buffer is + * too small, an exception will be thrown. + * + * @param buffer A buffer to read the record into + */ + void readNext(ByteBuffer buffer) throws IOException; + + /** + * Reads the next {@code numRecords} records from the LogStream, starting at the current position. This method + * may return fewer than requested records if the LogStream does not have sufficient records to return. + * + * @param numRecords The number of records to return + * @return The records, no more than the requested {@code numRecords} amount. + */ + List<ByteBuffer> readBulk(int numRecords) throws IOException; + + /** + * Fills the provided {@code List<ByteBuffer>} with records from the LogStream, starting at the current position. + * This method will attempt to fill all of the {@code ByteBuffer}'s that were provided, as long as there are + * records in the {@code LogStream} to support this. This method will return the number of buffers that were + * filled. + * + * Each provided buffer must be capable of holding one complete record from the Log. If the provided buffer is + * too small, an exception will be thrown. + * + * @param buffers A non-empty list of non-null ByteBuffers. + * @return The number of records returns, equivalent to the number of filled buffers. + */ + int readBulk(List<ByteBuffer> buffers) throws IOException; + + /** + * Returns the current position of this Reader. The position is a {@code recordId}. + */ + long getPosition(); + + /** + * Overrides {@link #close()} in {@link AutoCloseable} to throw an IOException. + */ + void close() throws IOException; +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ef48512d/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogService.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogService.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogService.java new file mode 100644 index 0000000..860cb33 --- /dev/null +++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogService.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.logservice.api; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.ratis.logservice.api.LogStream.State; + +/** + * Entry point for interacting with the Ratis LogService. + */ +public interface LogService extends AutoCloseable { + /* + * How to create a LogStream + */ + + /** + * Creates a new {@link LogStream} identified by the given name with default + * configuration. Throws an exception if a {@link LogStream} with the given + * name already exists. + * + * @param name Unique name for this LogStream. + */ + LogStream createLog(LogName name); + + /** + * Creates a new {@link LogStream} identified by the given name. Throws + * an exception if a {@link LogStream} with the given name already exists. + * + * @param name Unique name for this LogStream. + * @param config Configuration object for this LogStream + */ + LogStream createLog(LogName name, LogStreamConfiguration config); + + /* + * How to get LogStreams that already exist + */ + /** + * Fetches the {@link LogStream} identified by the given name. + * + * @param name The name of the LogStream + */ + LogStream getLog(LogName name); + + /** + * Lists all {@link LogStream} instances known by this LogService. + */ + Iterator<LogStream> listLogs(); + + /* + * How to close, archive, and delete LogStreams + */ + + /** + * Moves the {@link LogStream} identified by the {@code name} from {@link State.OPEN} to {@link State.CLOSED}. + * If the log is not {@link State#OPEN}, this method returns an error. + * + * @param name The name of the log to close + */ + // TODO this name sucks, confusion WRT the Java Closeable interface. + void closeLog(LogName name); + + /** + * Returns the current {@link State} of the log identified by {@code name}. + * + * @param name The name of a log + */ + State getState(LogName name); + + /** + * Archives the given log out of the state machine and into a configurable long-term storage. A log must be + * in {@link State#CLOSED} to archive it. + * + * @param name The name of the log to archive. + */ + void archiveLog(LogName name); + + /** + * Deletes the {@link LogStream}. + * @param name The name of the LogStream + */ + void deleteLog(LogName name); + + /* + * Change the configuration of a LogStream or manipulate a LogStream's listeners + */ + + /** + * Updates a log with the new configuration object, overriding + * the previous configuration. + * + * @param config The new configuration object + */ + void updateConfiguration(LogName name, LogStreamConfiguration config); + + /** + * Registers a {@link RecordListener} with the log which will receive all records written using + * the unique name provided by {@link RecorderListener#getName()}. + * + * Impl spec: The name returned by a {@link RecordListener} instance uniquely identifies it against other + * instances. + * + * @param the log's name + * @param listener The listener to register + */ + void addRecordListener(LogName name, RecordListener listener); + + /** + * Removes a {@link RecordListener) for the log. + * + * Impl spec: The name returned by a {@link RecordListener} instance uniquely identifies it against + * other instances. + * + * @param the log's name + * @param listener The listener to remove + */ + void removeRecordListener(LogName name, RecordListener listener); + + /** + * Overrides {@link #close()} in {@link AutoCloseable} to throw an IOException. + */ + void close() throws IOException; +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ef48512d/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStream.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStream.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStream.java new file mode 100644 index 0000000..fea213e --- /dev/null +++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStream.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.logservice.api; + +import java.io.IOException; +import java.util.Set; + +/** + * A distributed log with "infinite" length that supports reads and writes. + */ +public interface LogStream { + + /** + * An enumeration that defines the current state of a LogStream + */ + public enum State { + OPEN, + CLOSED; + } + + /** + * Returns the unique name to identify this log. + */ + LogName getName(); + + /** + * Returns the current state of this log. + */ + State getState(); + + /** + * Returns the size of this LogStream in bytes. + */ + long getSize(); + + /** + * Creates a reader to read this LogStream. + * + * @return A synchronous reader + */ + LogReader createReader(); + + /** + * Creates a write to write to this LogStream. + * + * @return A synchronous writer + */ + LogWriter createWriter(); + + /** + * Returns the recordId of the last record in this LogStream. For an empty log, the recordId is {@code 0}. + */ + long getLastRecordId(); + + /** + * Returns all {@link RecordListeners} for this LogStream. + */ + Set<RecordListener> getRecordListeners(); + + /** + * Returns a copy of the Configuration for this LogStream. + */ + LogStreamConfiguration getConfiguration(); +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ef48512d/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStreamConfiguration.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStreamConfiguration.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStreamConfiguration.java new file mode 100644 index 0000000..12aa030 --- /dev/null +++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStreamConfiguration.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.logservice.api; + +import java.util.Map; +import java.util.Map.Entry; + +/** + * An encapsulation of configuration for a LogStream. + */ +public interface LogStreamConfiguration { + + /** + * Fetches the value for the given key from the configuration. If there is no entry for + * the given key, {@code null} is returned. + * + * @param key The configuration key + */ + String get(String key); + + /** + * Sets the given key and value into this configuration. The configuration key may + * not be null. A null value removes the key from the configuration. + * + * @param key Configuration key, must be non-null + * @param value Configuration value + */ + void set(String key, String value); + + /** + * Removes any entry with the given key from the configuration. If there is no entry + * for the given key, this method returns without error. The provided key must be + * non-null. + * + * @param key The configuration key, must be non-null + */ + void remove(String key); + + /** + * Sets the collection of key-value pairs into the configuration. This is functionally + * equivalent to calling {@link #set(String, String)} numerous time. + */ + void setMany(Iterable<Entry<String,String>> entries); + + /** + * Returns an immutable view over the configuration as a {@code Map}. + */ + Map<String,String> asMap(); +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ef48512d/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogWriter.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogWriter.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogWriter.java new file mode 100644 index 0000000..5a51a3c --- /dev/null +++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogWriter.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.logservice.api; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; + +/** + * Synchronous client interface to write to a LogStream. + */ +public interface LogWriter extends AutoCloseable { + + /** + * Appends the given data as a record in the LogStream. + * + * @param data The record to append + * @return The recordId for the record just written + */ + long write(ByteBuffer data) throws IOException; + + /** + * Appends each entry of data as a new record in the LogStream. If this method returns + * successfully, all records can be considered persisted. Otherwise, none can be assumed + * to have been written. + * + * @param records Records to append + * @return The largest recordId assigned to the records written + */ + default long write(List<ByteBuffer> records) throws IOException { + for (ByteBuffer record : records) { + write(record); + } + return records.size(); + } + + /** + * Guarantees that all previous data appended by {@link #write(ByteBuffer)} are persisted + * and durable in the LogStream. + * + * @return The recordId prior to which all records are durable + */ + long sync() throws IOException; + + /** + * Overrides {@link #close()} in {@link AutoCloseable} to throw an IOException. + */ + void close() throws IOException; +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ef48512d/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/RecordListener.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/RecordListener.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/RecordListener.java new file mode 100644 index 0000000..a62d071 --- /dev/null +++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/RecordListener.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.logservice.api; + +import java.nio.ByteBuffer; +import java.util.function.Consumer; + +/** + * Interface that, when registered with a {@link LogStream}, will receive all records written + * to that LogStream until it is removed. + */ +public interface RecordListener extends Consumer<ByteBuffer> { + + /** + * Returns a name to identify this listener from others. + */ + String getName(); +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ef48512d/ratis-logservice/src/main/java/org/apache/ratis/logservice/dummy/DummyLogReader.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/dummy/DummyLogReader.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/dummy/DummyLogReader.java new file mode 100644 index 0000000..71d2164 --- /dev/null +++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/dummy/DummyLogReader.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.logservice.dummy; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import org.apache.ratis.logservice.api.LogReader; + +public class DummyLogReader implements LogReader { + private static final byte[] IMMUTABLE_BYTES = new byte[0]; + + @Override + public void close() {} + + @Override + public void seek(long recordId) throws IOException { + // Noop. + return; + } + + @Override + public ByteBuffer readNext() throws IOException { + return ByteBuffer.wrap(IMMUTABLE_BYTES); + } + + @Override + public List<ByteBuffer> readBulk(int numRecords) throws IOException { + ArrayList<ByteBuffer> records = new ArrayList<>(numRecords); + for (int i = 0; i < numRecords; i++) { + records.add(ByteBuffer.wrap(IMMUTABLE_BYTES)); + } + return records; + } + + @Override + public void readNext(ByteBuffer buffer) throws IOException { + buffer.clear(); + if (buffer.remaining() < IMMUTABLE_BYTES.length) { + throw new IllegalArgumentException("Cannot read data into buffer of size " + buffer.remaining()); + } + buffer.put(IMMUTABLE_BYTES); + buffer.flip(); + } + + @Override + public int readBulk(List<ByteBuffer> buffers) throws IOException { + for (ByteBuffer buffer : buffers) { + readNext(buffer); + } + return buffers.size(); + } + + @Override + public long getPosition() { + // Always at the head of the list + return 0; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ef48512d/ratis-logservice/src/main/java/org/apache/ratis/logservice/dummy/DummyLogService.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/dummy/DummyLogService.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/dummy/DummyLogService.java new file mode 100644 index 0000000..691556a --- /dev/null +++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/dummy/DummyLogService.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.logservice.dummy; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.ratis.logservice.api.LogName; +import org.apache.ratis.logservice.api.LogService; +import org.apache.ratis.logservice.api.LogStream; +import org.apache.ratis.logservice.api.LogStream.State; +import org.apache.ratis.logservice.api.LogStreamConfiguration; +import org.apache.ratis.logservice.api.RecordListener; + +public class DummyLogService implements LogService { + final ConcurrentHashMap<LogName,Set<RecordListener>> recordListeners = new ConcurrentHashMap<>(); + + @Override + public LogStream createLog(LogName name) { + return new DummyLogStream(this, name); + } + + @Override + public LogStream createLog(LogName name, LogStreamConfiguration config) { + return new DummyLogStream(this, name); + } + + @Override + public LogStream getLog(LogName name) { + return new DummyLogStream(this, name); + } + + @Override + public Iterator<LogStream> listLogs() { + return Collections.<LogStream> emptyList().iterator(); + } + + @Override public void closeLog(LogName name) {} + + @Override + public State getState(LogName name) { + return State.OPEN; + } + + @Override public void archiveLog(LogName name) {} + + @Override public void deleteLog(LogName name) {} + + @Override public void updateConfiguration(LogName name, LogStreamConfiguration config) {} + + @Override public void addRecordListener(LogName name, RecordListener listener) { + recordListeners.compute(name, (key, currentValue) -> { + if (currentValue == null) { + return new HashSet<RecordListener>(Collections.singleton(listener)); + } + currentValue.add(listener); + return currentValue; + }); + } + + @Override public void removeRecordListener(LogName name, RecordListener listener) { + recordListeners.compute(name, (key, currentValue) -> { + if (currentValue == null) { + return null; + } + currentValue.remove(listener); + return currentValue; + }); + } + + @Override public void close() throws IOException {} + +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ef48512d/ratis-logservice/src/main/java/org/apache/ratis/logservice/dummy/DummyLogStream.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/dummy/DummyLogStream.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/dummy/DummyLogStream.java new file mode 100644 index 0000000..c52a160 --- /dev/null +++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/dummy/DummyLogStream.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.logservice.dummy; + +import java.util.Collections; +import java.util.Objects; +import java.util.Set; + +import org.apache.ratis.logservice.api.LogName; +import org.apache.ratis.logservice.api.LogReader; +import org.apache.ratis.logservice.api.LogStream; +import org.apache.ratis.logservice.api.LogStreamConfiguration; +import org.apache.ratis.logservice.api.LogWriter; +import org.apache.ratis.logservice.api.RecordListener; + +public class DummyLogStream implements LogStream { + private final LogName name; + private final DummyLogService service; + + public DummyLogStream(DummyLogService service, LogName name) { + this.service = Objects.requireNonNull(service); + this.name = Objects.requireNonNull(name); + } + + @Override + public LogName getName() { + return name; + } + + @Override + public long getSize() { + return 0; + } + + @Override + public LogReader createReader() { + return new DummyLogReader(); + } + + @Override + public LogWriter createWriter() { + return new DummyLogWriter(); + } + + @Override + public Set<RecordListener> getRecordListeners() { + Set<RecordListener> listeners = service.recordListeners.get(name); + if (listeners == null) { + return Collections.emptySet(); + } + return Collections.unmodifiableSet(listeners); + } + + @Override + public State getState() { + return State.OPEN; + } + + @Override + public long getLastRecordId() { + return 0; + } + + @Override + public LogStreamConfiguration getConfiguration() { + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ef48512d/ratis-logservice/src/main/java/org/apache/ratis/logservice/dummy/DummyLogWriter.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/dummy/DummyLogWriter.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/dummy/DummyLogWriter.java new file mode 100644 index 0000000..c9f689d --- /dev/null +++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/dummy/DummyLogWriter.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.logservice.dummy; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.ratis.logservice.api.LogWriter; + +public class DummyLogWriter implements LogWriter { + private final AtomicLong counter; + + public DummyLogWriter() { + this.counter = new AtomicLong(-1); + } + + @Override public void close() {} + + @Override + public long write(ByteBuffer data) throws IOException { + return counter.incrementAndGet(); + } + + @Override + public long write(List<ByteBuffer> records) throws IOException { + return counter.addAndGet(records.size()); + } + + @Override + public long sync() throws IOException { + return counter.get(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ef48512d/ratis-logservice/src/test/java/org/apache/ratis/logservice/api/TestApiExample.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/test/java/org/apache/ratis/logservice/api/TestApiExample.java b/ratis-logservice/src/test/java/org/apache/ratis/logservice/api/TestApiExample.java new file mode 100644 index 0000000..9808b23 --- /dev/null +++ b/ratis-logservice/src/test/java/org/apache/ratis/logservice/api/TestApiExample.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.logservice.api; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; + +import org.apache.ratis.logservice.dummy.DummyLogService; +import org.junit.Test; + +/** + * Example usage of the LogService API with dummy objects. + */ +public class TestApiExample { + + byte[] intToBytes(int i) { + return Integer.toString(i).getBytes(StandardCharsets.UTF_8); + } + + @Test + public void test() throws IOException, InterruptedException, ExecutionException { + try (LogService svc = new DummyLogService()) { + LogStream log1 = svc.createLog(LogName.of("log1")); + // Write some data + try (LogWriter writer = log1.createWriter()) { + for (int i = 0; i < 5; i++) { + writer.write(ByteBuffer.wrap(intToBytes(i))); + } + + List<ByteBuffer> records = new ArrayList<>(5); + for (int i = 5; i < 10; i++) { + records.add(ByteBuffer.wrap(intToBytes(i))); + } + writer.write(records); + } + + // Read some data + try (LogReader reader = log1.createReader()) { + // Seek the reader + reader.seek(0); + List<ByteBuffer> records = reader.readBulk(10); + assertEquals(10, records.size()); + } + + svc.deleteLog(log1.getName()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ef48512d/ratis-logservice/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/test/resources/log4j.properties b/ratis-logservice/src/test/resources/log4j.properties new file mode 100644 index 0000000..ced0687 --- /dev/null +++ b/ratis-logservice/src/test/resources/log4j.properties @@ -0,0 +1,18 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# log4j configuration used during build and unit tests + +log4j.rootLogger=info,stdout +log4j.threshold=ALL +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
