RATIS-122. Add a FileStore example.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/8a40ee4b Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/8a40ee4b Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/8a40ee4b Branch: refs/heads/master Commit: 8a40ee4bccc23c37027829a9cc17838200650a0d Parents: 830bd61 Author: Tsz-Wo Nicholas Sze <[email protected]> Authored: Wed Nov 8 10:44:52 2017 -0800 Committer: Tsz-Wo Nicholas Sze <[email protected]> Committed: Wed Nov 8 15:02:40 2017 -0800 ---------------------------------------------------------------------- .../ratis/client/impl/ClientProtoUtils.java | 4 +- .../org/apache/ratis/util/CollectionUtils.java | 13 + .../java/org/apache/ratis/util/FileUtils.java | 9 +- .../java/org/apache/ratis/util/JavaUtils.java | 6 + .../java/org/apache/ratis/util/LogUtils.java | 73 ++++- .../java/org/apache/ratis/util/ProtoUtils.java | 30 ++- .../org/apache/ratis/util/ReflectionUtils.java | 3 +- .../java/org/apache/ratis/util/StringUtils.java | 23 +- .../java/org/apache/ratis/util/TaskQueue.java | 125 +++++++++ .../test/java/org/apache/ratis/BaseTest.java | 2 +- .../ratis/examples/filestore/FileInfo.java | 266 +++++++++++++++++++ .../ratis/examples/filestore/FileStore.java | 212 +++++++++++++++ .../examples/filestore/FileStoreClient.java | 122 +++++++++ .../examples/filestore/FileStoreCommon.java | 56 ++++ .../filestore/FileStoreStateMachine.java | 186 +++++++++++++ .../examples/filestore/FileStoreBaseTest.java | 201 ++++++++++++++ .../filestore/TestFileStoreWithGrpc.java | 25 ++ .../filestore/TestFileStoreWithNetty.java | 25 ++ .../ratis/netty/server/NettyRpcService.java | 6 +- .../src/main/proto/Examples.proto | 67 +++++ ratis-proto-shaded/src/main/proto/Raft.proto | 2 + .../ratis/server/impl/RaftServerImpl.java | 2 +- .../ratis/server/storage/RaftLogWorker.java | 37 ++- .../ratis/statemachine/BaseStateMachine.java | 6 + .../apache/ratis/statemachine/StateMachine.java | 13 + .../ratis/statemachine/TransactionContext.java | 11 +- .../java/org/apache/ratis/MiniRaftCluster.java | 27 +- .../org/apache/ratis/RaftExceptionBaseTest.java | 2 +- .../java/org/apache/ratis/RaftTestUtil.java | 15 +- .../server/impl/ServerInformationBaseTest.java | 9 +- 30 files changed, 1531 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8a40ee4b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java index 053d952..a01c376 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java @@ -23,9 +23,7 @@ import org.apache.ratis.shaded.proto.RaftProtos.*; import org.apache.ratis.util.ProtoUtils; import org.apache.ratis.util.ReflectionUtils; -import java.util.ArrayList; import java.util.Arrays; -import java.util.List; import static org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.NOTLEADEREXCEPTION; import static org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.STATEMACHINEEXCEPTION; @@ -125,7 +123,7 @@ public class ClientProtoUtils { final Throwable t = sme.getCause() != null ? sme.getCause() : sme; smeBuilder.setExceptionClassName(t.getClass().getName()) .setErrorMsg(t.getMessage()) - .setStacktrace(ProtoUtils.toByteString(t.getStackTrace())); + .setStacktrace(ProtoUtils.writeObject2ByteString(t.getStackTrace())); b.setStateMachineException(smeBuilder.build()); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8a40ee4b/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java index 5ea8d4f..57222a6 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java @@ -23,6 +23,7 @@ package org.apache.ratis.util; import java.util.*; import java.util.concurrent.ThreadLocalRandom; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -82,4 +83,16 @@ public interface CollectionUtils { INPUT[] array, Function<INPUT, OUTPUT> converter) { return as(Arrays.asList(array), converter); } + + static <K, V> void putNew(K key, V value, Map<K, V> map, Supplier<String> name) { + final V returned = map.put(key, value); + Preconditions.assertTrue(returned == null, + () -> "Entry already exists for key " + key + " in map " + name.get()); + } + + static <K, V> void replaceExisting(K key, V oldValue, V newValue, Map<K, V> map, Supplier<String> name) { + final boolean replaced = map.replace(key, oldValue, newValue); + Preconditions.assertTrue(replaced, + () -> "Entry not found for key " + key + " in map " + name.get()); + } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8a40ee4b/ratis-common/src/main/java/org/apache/ratis/util/FileUtils.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/FileUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/FileUtils.java index 38fdff3..3171d4e 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/FileUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/FileUtils.java @@ -23,6 +23,7 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.io.OutputStream; import java.nio.file.*; import java.nio.file.attribute.BasicFileAttributes; @@ -38,6 +39,12 @@ public interface FileUtils { () -> "FileOutputStream.getChannel().truncate " + f + " to target length " + target); } + static OutputStream createNewFile(Path p) throws IOException { + return LogUtils.supplyAndLog(LOG, + () -> Files.newOutputStream(p, StandardOpenOption.CREATE_NEW), + () -> "Files.newOutputStream " + StandardOpenOption.CREATE_NEW + " " + p); + } + static void createDirectories(File dir) throws IOException { createDirectories(dir.toPath()); } @@ -93,7 +100,7 @@ public interface FileUtils { */ static void deleteFully(Path p) throws IOException { if (!Files.exists(p, LinkOption.NOFOLLOW_LINKS)) { - LOG.trace("deleteFully: {} does not exist."); + LOG.trace("deleteFully: {} does not exist.", p); return; } Files.walkFileTree(p, new SimpleFileVisitor<Path>() { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8a40ee4b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java index ed115cb..89407eb 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java @@ -177,4 +177,10 @@ public interface JavaUtils { println.accept(ti.toString()); } } + + static <E> CompletableFuture<E> completeExceptionally(Throwable t) { + final CompletableFuture<E> future = new CompletableFuture<>(); + future.completeExceptionally(t); + return future; + } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8a40ee4b/ratis-common/src/main/java/org/apache/ratis/util/LogUtils.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/LogUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/LogUtils.java index ebb61be..6a4d833 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/LogUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/LogUtils.java @@ -24,6 +24,8 @@ import org.apache.log4j.Level; import org.apache.log4j.LogManager; import org.slf4j.Logger; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; /** @@ -40,9 +42,6 @@ public interface LogUtils { throws THROWABLE { try { op.run(); - if (log.isTraceEnabled()) { - log.trace("Executed " + opName.get() + " successfully."); - } } catch (Throwable t) { if (log.isTraceEnabled()) { log.trace("Failed to " + opName.get(), t); @@ -51,5 +50,73 @@ public interface LogUtils { } throw t; } + + if (log.isTraceEnabled()) { + log.trace("Successfully ran " + opName.get()); + } + } + + static <OUTPUT, THROWABLE extends Throwable> OUTPUT supplyAndLog( + Logger log, CheckedSupplier<OUTPUT, THROWABLE> supplier, Supplier<String> name) + throws THROWABLE { + final OUTPUT output; + try { + output = supplier.get(); + } catch (Throwable t) { + if (log.isTraceEnabled()) { + log.trace("Failed to " + name.get(), t); + } else if (log.isWarnEnabled()){ + log.warn("Failed to " + name.get() + ": " + t); + } + throw (THROWABLE)t; + } + + if (log.isTraceEnabled()) { + log.trace("Successfully supplied " + name.get()); + } + return output; + } + + static Runnable newRunnable(Logger log, Runnable runnable, Supplier<String> name) { + return new Runnable() { + @Override + public void run() { + runAndLog(log, runnable::run, name); + } + + @Override + public String toString() { + return name.get(); + } + }; + } + + static <T> Callable<T> newCallable(Logger log, Callable<T> callable, Supplier<String> name) { + return new Callable<T>() { + @Override + public T call() throws Exception { + return supplyAndLog(log, callable::call, name); + } + + @Override + public String toString() { + return name.get(); + } + }; + } + + static <OUTPUT, THROWABLE extends Throwable> CheckedSupplier<OUTPUT, THROWABLE> newCheckedSupplier( + Logger log, CheckedSupplier<OUTPUT, THROWABLE> supplier, Supplier<String> name) { + return new CheckedSupplier<OUTPUT, THROWABLE>() { + @Override + public OUTPUT get() throws THROWABLE { + return supplyAndLog(log, supplier, name); + } + + @Override + public String toString() { + return name.get(); + } + }; } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8a40ee4b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java index 73e8646..2acda38 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java @@ -21,6 +21,7 @@ import org.apache.ratis.protocol.*; import org.apache.ratis.shaded.com.google.protobuf.ByteString; import org.apache.ratis.shaded.com.google.protobuf.ServiceException; import org.apache.ratis.shaded.proto.RaftProtos.*; +import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase; import java.io.IOException; import java.io.ObjectInputStream; @@ -29,8 +30,8 @@ import java.util.Collection; import java.util.Iterator; import java.util.List; -public class ProtoUtils { - public static ByteString toByteString(Object obj) { +public interface ProtoUtils { + public static ByteString writeObject2ByteString(Object obj) { final ByteString.Output byteOut = ByteString.newOutput(); try(final ObjectOutputStream objOut = new ObjectOutputStream(byteOut)) { objOut.writeObject(obj); @@ -52,6 +53,10 @@ public class ProtoUtils { } } + static ByteString toByteString(String string) { + return ByteString.copyFromUtf8(string); + } + public static ByteString toByteString(byte[] bytes) { return toByteString(bytes, 0, bytes.length); } @@ -133,6 +138,27 @@ public class ProtoUtils { .build(); } + /** + * If the given entry is {@link LogEntryBodyCase#SMLOGENTRY} and it has state machine data, + * build a new entry without the state machine data. + * + * @return a new entry without the state machine data if the given has state machine data; + * otherwise, return the given entry. + */ + static LogEntryProto removeStateMachineData(LogEntryProto entry) { + if (entry.getLogEntryBodyCase() != LogEntryBodyCase.SMLOGENTRY) { + return entry; + } + final SMLogEntryProto smLog = entry.getSmLogEntry(); + if (smLog.getStateMachineData().isEmpty()) { + return entry; + } + // build a new LogEntryProto without state machine data + return LogEntryProto.newBuilder(entry) + .setSmLogEntry(SMLogEntryProto.newBuilder().setData(smLog.getData())) + .build(); + } + public static IOException toIOException(ServiceException se) { final Throwable t = se.getCause(); if (t == null) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8a40ee4b/ratis-common/src/main/java/org/apache/ratis/util/ReflectionUtils.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ReflectionUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/ReflectionUtils.java index 700965f..c185e66 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/ReflectionUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/ReflectionUtils.java @@ -173,7 +173,8 @@ public interface ReflectionUtils { ctor = Constructors.get(clazz, argClasses); } catch (NoSuchMethodException e) { throw new UnsupportedOperationException( - "Unable to find suitable constructor for class " + clazz.getName(), e); + "Unable to find suitable constructor for class " + clazz.getName() + + ", argument classes = " + Arrays.toString(argClasses), e); } return instantiate(clazz.getName(), ctor, args); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8a40ee4b/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java index 7a2ddd0..07a8973 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java @@ -22,8 +22,10 @@ import org.apache.ratis.shaded.com.google.common.collect.Interners; import java.io.PrintWriter; import java.io.StringWriter; +import java.nio.ByteBuffer; import java.util.Locale; import java.util.Objects; +import java.util.function.Supplier; public class StringUtils { public static final String[] EMPTY_STRING_ARRAY = {}; @@ -64,11 +66,17 @@ public class StringUtils { public static String bytes2HexString(byte[] bytes) { Objects.requireNonNull(bytes, "bytes == null"); + return bytes2HexString(ByteBuffer.wrap(bytes)); + } + + public static String bytes2HexString(ByteBuffer bytes) { + Objects.requireNonNull(bytes, "bytes == null"); - final StringBuilder s = new StringBuilder(2 * bytes.length); - for(byte b : bytes) { - s.append(format("%02x", b)); + final StringBuilder s = new StringBuilder(2 * bytes.remaining()); + for(; bytes.remaining() > 0; ) { + s.append(format("%02x", bytes.get())); } + bytes.flip(); return s.toString(); } @@ -93,4 +101,13 @@ public class StringUtils { wrt.close(); return stm.toString(); } + + public static Object stringSupplierAsObject(Supplier<String> supplier) { + return new Object() { + @Override + public String toString() { + return supplier.get(); + } + }; + } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8a40ee4b/ratis-common/src/main/java/org/apache/ratis/util/TaskQueue.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/TaskQueue.java b/ratis-common/src/main/java/org/apache/ratis/util/TaskQueue.java new file mode 100644 index 0000000..c72067b --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/util/TaskQueue.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.LinkedList; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.function.Function; + +/** + * A queue with execution order guarantee such that + * each task is submitted for execution only if it becomes the head of the queue. + * Tasks are executed sequentially without any overlap. + * + * By the definition of a queue, a task can become the head iff + * (1) the queue is empty when offering it, or + * (2) it is the next to the head and the head is polled out from the queue. + * + * A typically use case is to submit concurrent tasks + * with in-order guarantee for some of the tasks. + * + * One example use case is to submit tasks to write multiple files: + * - A file may requires multiple write tasks. + * - Multiple files are written at the same time. + * A solution is to create a {@link TaskQueue} for each file + * and then submit the write tasks to the corresponding queue. + * The files will be written concurrently and the writes to each file are in-order. + */ +public class TaskQueue { + public static final Logger LOG = LoggerFactory.getLogger(TaskQueue.class); + + private final String name; + private final Queue<Runnable> q = new LinkedList<>(); + + public TaskQueue(String name) { + this.name = name; + } + + /** + * Poll the current head from this queue + * and then submit the next head, if there is any. + */ + private synchronized Runnable pollAndSubmit(ExecutorService executor) { + final Runnable head = q.poll(); + final Runnable next = q.peek(); + if (next != null) { + executor.submit(next); + } + return head; + } + + /** + * Offer the given task to this queue. + * If it is the first task, submit it. + */ + private synchronized void offerAndSubmit(Runnable task, ExecutorService executor) { + q.offer(task); + if (q.size() == 1) { + executor.submit(task); + } + } + + /** + * The same as submit(task, executor, Function.identity()); + */ + public <OUTPUT, THROWABLE extends Throwable> CompletableFuture<OUTPUT> submit( + CheckedSupplier<OUTPUT, THROWABLE> task, ExecutorService executor) { + return submit(task, executor, Function.identity()); + } + + /** + * Offer the given task to this queue + * and then submit the tasks one by one in the queue order for execution. + * + * @param task the task to be submitted. + * @param executor to execute tasks. + * @param newThrowable When the task throws a throwable, create a new Throwable + * in order to include more error message. + * @param <OUTPUT> the output type of the task. + * @param <THROWABLE> the throwable type of the task. + * @return a future of the output. + */ + public <OUTPUT, THROWABLE extends Throwable> CompletableFuture<OUTPUT> submit( + CheckedSupplier<OUTPUT, THROWABLE> task, ExecutorService executor, + Function<Throwable, Throwable> newThrowable) { + final CompletableFuture<OUTPUT> f = new CompletableFuture<>(); + final Runnable runnable = LogUtils.newRunnable(LOG, () -> { + LOG.trace("{}: running {}", this, task); + try { + f.complete(task.get()); + } catch (Throwable e) { + f.completeExceptionally(newThrowable.apply(e)); + } + + pollAndSubmit(executor); + }, task::toString); + + offerAndSubmit(runnable, executor); + return f; + } + + @Override + public synchronized String toString() { + return name + "-" + getClass().getSimpleName(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8a40ee4b/ratis-common/src/test/java/org/apache/ratis/BaseTest.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/test/java/org/apache/ratis/BaseTest.java b/ratis-common/src/test/java/org/apache/ratis/BaseTest.java index 1c27420..f4e622a 100644 --- a/ratis-common/src/test/java/org/apache/ratis/BaseTest.java +++ b/ratis-common/src/test/java/org/apache/ratis/BaseTest.java @@ -52,7 +52,7 @@ public abstract class BaseTest { private static final Supplier<File> rootTestDir = JavaUtils.memoize( () -> JavaUtils.callAsUnchecked(() -> { final File dir = new File(System.getProperty("test.build.data", "target/test/data"), - Long.toHexString(ThreadLocalRandom.current().nextLong())); + Integer.toHexString(ThreadLocalRandom.current().nextInt())); if (dir.exists() && !dir.isDirectory()) { throw new IOException(dir + " already exists and is not a directory"); } else if (!dir.exists() && !dir.mkdirs()) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8a40ee4b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileInfo.java ---------------------------------------------------------------------- diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileInfo.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileInfo.java new file mode 100644 index 0000000..d8b3eb4 --- /dev/null +++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileInfo.java @@ -0,0 +1,266 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.examples.filestore; + +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.shaded.com.google.protobuf.ByteString; +import org.apache.ratis.util.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.SeekableByteChannel; +import java.nio.channels.WritableByteChannel; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.function.Function; +import java.util.function.Supplier; + +abstract class FileInfo { + public static final Logger LOG = LoggerFactory.getLogger(FileInfo.class); + + private final Path relativePath; + + FileInfo(Path relativePath) { + this.relativePath = relativePath; + } + + Path getRelativePath() { + return relativePath; + } + + long getSize() { + throw new UnsupportedOperationException( + "File " + getRelativePath() + " size is unknown."); + } + + void flush() throws IOException { + // no-op + } + + ByteString read(CheckedFunction<Path, Path, IOException> resolver, long offset, long length) + throws IOException { + flush(); + if (offset + length > getSize()) { + throw new IOException("Failed to read: offset (=" + offset + + " + length (=" + length + ") > size = " + getSize() + + ", path=" + getRelativePath()); + } + + try(final SeekableByteChannel in = Files.newByteChannel( + resolver.apply(getRelativePath()), StandardOpenOption.READ)) { + final ByteBuffer buffer = ByteBuffer.allocateDirect(FileStoreCommon.getChunkSize(length)); + in.position(offset).read(buffer); + buffer.flip(); + return ByteString.copyFrom(buffer); + } + } + + UnderConstruction asUnderConstruction() { + throw new UnsupportedOperationException( + "File " + getRelativePath() + " is not under construction."); + } + + static class ReadOnly extends FileInfo { + private final long size; + + ReadOnly(UnderConstruction f) { + super(f.getRelativePath()); + this.size = f.getSize(); + } + + @Override + long getSize() { + return size; + } + } + + static class FileOut implements Closeable { + private final OutputStream out; + private final WritableByteChannel channel; + + FileOut(Path p) throws IOException { + this.out = FileUtils.createNewFile(p); + this.channel = Channels.newChannel(out); + } + + int write(ByteBuffer data) throws IOException { + return channel.write(data); + } + + void flush() throws IOException { + out.flush(); + } + + @Override + public void close() throws IOException { + channel.close(); + out.close(); + } + } + + static class UnderConstruction extends FileInfo { + private FileOut out; + + /** The size written to a local file. */ + private volatile long writeSize; + /** The size committed to client. */ + private volatile long committedSize; + /** The size at last flush. */ + private volatile long flushSize; + + /** A queue to make sure that the writes are in order. */ + private final TaskQueue writeQueue = new TaskQueue("writeQueue"); + /** A queue to make sure that the commits are in order. */ + private final TaskQueue commitQueue = new TaskQueue("commitQueue"); + /** Futures to make sure that each commit is executed the corresponding write. */ + private final Map<Long, CompletableFuture<Integer>> writeFutures = new ConcurrentHashMap<>(); + + UnderConstruction(Path relativePath) { + super(relativePath); + } + + @Override + UnderConstruction asUnderConstruction() { + return this; + } + + @Override + long getSize() { + return committedSize; + } + + CompletableFuture<Integer> submitCreate( + CheckedFunction<Path, Path, IOException> resolver, ByteString data, boolean close, + ExecutorService executor, RaftPeerId id, long index) { + final Supplier<String> name = () -> "create(" + getRelativePath() + ", " + + close + ") @" + id + ":" + index; + final CheckedSupplier<Integer, IOException> task = LogUtils.newCheckedSupplier(LOG, () -> { + if (out == null) { + out = new FileOut(resolver.apply(getRelativePath())); + } + return write(0L, data, close); + }, name); + return submitWrite(task, executor, id, index); + } + + CompletableFuture<Integer> submitWrite( + long offset, ByteString data, boolean close, ExecutorService executor, + RaftPeerId id, long index) { + final Supplier<String> name = () -> "write(" + getRelativePath() + ", " + + offset + ", " + close + ") @" + id + ":" + index; + final CheckedSupplier<Integer, IOException> task = LogUtils.newCheckedSupplier(LOG, + () -> write(offset, data, close), name); + return submitWrite(task, executor, id, index); + } + + private CompletableFuture<Integer> submitWrite( + CheckedSupplier<Integer, IOException> task, ExecutorService executor, + RaftPeerId id, long index) { + final CompletableFuture<Integer> f = writeQueue.submit(task, executor, + e -> new IOException("Failed " + task, e)); + CollectionUtils.putNew(index, f, writeFutures, () -> id + ":writeFutures"); + return f; + } + + private int write(long offset, ByteString data, boolean close) throws IOException { + if (offset != writeSize) { + throw new IOException("Offset/size mismatched: offset = " + offset + + " != writeSize = " + writeSize + ", path=" + getRelativePath()); + } + if (out == null) { + throw new IOException("File output is not initialized, path=" + getRelativePath()); + } + + synchronized (out) { + int n = 0; + if (data != null) { + final ByteBuffer buffer = data.asReadOnlyByteBuffer(); + try { + for (; buffer.remaining() > 0; ) { + n += out.write(buffer); + } + } finally { + writeSize += n; + } + } + + if (close) { + out.close(); + } + return n; + } + } + + void flush() throws IOException { + if (flushSize >= committedSize) { + return; + } + synchronized (out) { + if (flushSize >= committedSize) { + return; + } + out.flush(); + flushSize = writeSize; + } + } + + CompletableFuture<Integer> submitCommit( + long offset, int size, Function<UnderConstruction, ReadOnly> converter, + ExecutorService executor, RaftPeerId id, long index) { + final Supplier<String> name = () -> "commit(" + getRelativePath() + ", " + + offset + ", " + size + ") @" + id + ":" + index; + final CheckedSupplier<Integer, IOException> task = LogUtils.newCheckedSupplier(LOG, () -> { + if (offset != committedSize) { + throw new IOException("Offset/size mismatched: offset = " + + offset + " != committedSize = " + committedSize + + ", path=" + getRelativePath()); + } else if (committedSize + size > writeSize) { + throw new IOException("Offset/size mismatched: committed (=" + committedSize + + ") + size (=" + size + ") > writeSize = " + writeSize); + } + committedSize += size; + + if (converter != null) { + converter.apply(this); + } + return size; + }, name); + + final CompletableFuture<Integer> write = writeFutures.remove(index); + if (write == null) { + return JavaUtils.completeExceptionally( + new IOException(name.get() + " is already committed.")); + } + return write.thenComposeAsync(writeSize -> { + Preconditions.assertTrue(size == writeSize); + return commitQueue.submit(task, executor, + e -> new IOException("Failed " + task, e)); + }, executor); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8a40ee4b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java ---------------------------------------------------------------------- diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java new file mode 100644 index 0000000..aba2a19 --- /dev/null +++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java @@ -0,0 +1,212 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.examples.filestore; + +import org.apache.ratis.examples.filestore.FileInfo.ReadOnly; +import org.apache.ratis.examples.filestore.FileInfo.UnderConstruction; +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.shaded.com.google.protobuf.ByteString; +import org.apache.ratis.shaded.proto.ExamplesProtos.*; +import org.apache.ratis.util.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.*; +import java.util.function.Function; +import java.util.function.Supplier; + +public class FileStore implements Closeable { + public static final Logger LOG = LoggerFactory.getLogger(FileStore.class); + + static class FileMap { + private final Object name; + private final Map<Path, FileInfo> map = new ConcurrentHashMap<>(); + + FileMap(Supplier<String> name) { + this.name = StringUtils.stringSupplierAsObject(name); + } + + FileInfo get(String relative) throws FileNotFoundException { + return applyFunction(relative, map::get); + } + + FileInfo remove(String relative) throws FileNotFoundException { + LOG.trace("{}: remove {}", name, relative); + return applyFunction(relative, map::remove); + } + + private FileInfo applyFunction(String relative, Function<Path, FileInfo> f) + throws FileNotFoundException { + final FileInfo info = f.apply(normalize(relative)); + if (info == null) { + throw new FileNotFoundException("File " + relative + " not found in " + name); + } + return info; + } + + void putNew(UnderConstruction uc) { + LOG.trace("{}: putNew {}", name, uc.getRelativePath()); + CollectionUtils.putNew(uc.getRelativePath(), uc, map, name::toString); + } + + ReadOnly close(UnderConstruction uc) { + LOG.trace("{}: close {}", name, uc.getRelativePath()); + final ReadOnly ro = new ReadOnly(uc); + CollectionUtils.replaceExisting(uc.getRelativePath(), uc, ro, map, name::toString); + return ro; + } + } + + private final Supplier<RaftPeerId> idSupplier; + private final Supplier<Path> rootSupplier; + private final FileMap files; + + private final ExecutorService writer = Executors.newFixedThreadPool(10); + private final ExecutorService committer = Executors.newFixedThreadPool(3); + private final ExecutorService reader = Executors.newFixedThreadPool(10); + private final ExecutorService deleter = Executors.newFixedThreadPool(3); + + public FileStore(Supplier<RaftPeerId> idSupplier, Path dir) { + this.idSupplier = idSupplier; + this.rootSupplier = JavaUtils.memoize( + () -> dir.resolve(getId().toString()).normalize().toAbsolutePath()); + this.files = new FileMap(JavaUtils.memoize(() -> idSupplier.get() + ":files")); + } + + public RaftPeerId getId() { + return Objects.requireNonNull(idSupplier.get(), getClass().getSimpleName() + " is not initialized."); + } + + public Path getRoot() { + return rootSupplier.get(); + } + + static Path normalize(String path) { + Objects.requireNonNull(path, "path == null"); + return Paths.get(path).normalize(); + } + + Path resolve(Path relative) throws IOException { + final Path root = getRoot(); + final Path full = root.resolve(relative).normalize().toAbsolutePath(); + if (full.equals(root)) { + throw new IOException("The file path " + relative + " resolved to " + full + + " is the root directory " + root); + } else if (!full.startsWith(root)) { + throw new IOException("The file path " + relative + " resolved to " + full + + " is not a sub-path under root directory " + root); + } + return full; + } + + CompletableFuture<ReadReplyProto> read(String relative, long offset, long length) { + final Supplier<String> name = () -> "read(" + relative + + ", " + offset + ", " + length + ") @" + getId(); + final CheckedSupplier<ReadReplyProto, IOException> task = LogUtils.newCheckedSupplier(LOG, () -> { + final FileInfo info = files.get(relative); + final ReadReplyProto.Builder reply = ReadReplyProto.newBuilder() + .setResolvedPath(FileStoreCommon.toByteString(info.getRelativePath())) + .setOffset(offset); + + final ByteString bytes = info.read(this::resolve, offset, length); + return reply.setData(bytes).build(); + }, name); + return submit(task, reader); + } + + CompletableFuture<Path> delete(long index, String relative) { + final Supplier<String> name = () -> "delete(" + relative + ") @" + getId() + ":" + index; + final CheckedSupplier<Path, IOException> task = LogUtils.newCheckedSupplier(LOG, () -> { + final FileInfo info = files.remove(relative); + FileUtils.delete(resolve(info.getRelativePath())); + return info.getRelativePath(); + }, name); + return submit(task, deleter); + } + + static <T> CompletableFuture<T> submit( + CheckedSupplier<T, IOException> task, ExecutorService executor) { + final CompletableFuture<T> f = new CompletableFuture<>(); + executor.submit(() -> { + try { + f.complete(task.get()); + } catch (IOException e) { + f.completeExceptionally(new IOException("Failed " + task, e)); + } + }); + return f; + } + + CompletableFuture<WriteReplyProto> submitCommit( + long index, String relative, boolean close, long offset, int size) { + final Function<UnderConstruction, ReadOnly> converter = close ? files::close: null; + final UnderConstruction uc; + try { + uc = files.get(relative).asUnderConstruction(); + } catch (FileNotFoundException e) { + return FileStoreCommon.completeExceptionally( + index, "Failed to write to " + relative, e); + } + + return uc.submitCommit(offset, size, converter, committer, getId(), index) + .thenApply(n -> WriteReplyProto.newBuilder() + .setResolvedPath(FileStoreCommon.toByteString(uc.getRelativePath())) + .setOffset(offset) + .setLength(n) + .build()); + } + + CompletableFuture<Integer> write( + long index, String relative, boolean close, long offset, ByteString data) { + final int size = data != null? data.size(): 0; + LOG.trace("write {}, offset={}, size={}, close? {} @{}:{}", + relative, offset, size, close, getId(), index); + final boolean createNew = offset == 0L; + final UnderConstruction uc; + if (createNew) { + uc = new UnderConstruction(normalize(relative)); + files.putNew(uc); + } else { + try { + uc = files.get(relative).asUnderConstruction(); + } catch (FileNotFoundException e) { + return FileStoreCommon.completeExceptionally( + index, "Failed to write to " + relative, e); + } + } + + return size == 0 && !close? CompletableFuture.completedFuture(0) + : createNew? uc.submitCreate(this::resolve, data, close, writer, getId(), index) + : uc.submitWrite(offset, data, close, writer, getId(), index); + } + + @Override + public void close() { + writer.shutdownNow(); + committer.shutdownNow(); + reader.shutdownNow(); + deleter.shutdownNow(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8a40ee4b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java ---------------------------------------------------------------------- diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java new file mode 100644 index 0000000..59d5079 --- /dev/null +++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.examples.filestore; + +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.protocol.Message; +import org.apache.ratis.protocol.RaftClientReply; +import org.apache.ratis.protocol.RaftGroup; +import org.apache.ratis.shaded.com.google.protobuf.ByteString; +import org.apache.ratis.shaded.proto.ExamplesProtos.*; +import org.apache.ratis.util.CheckedFunction; +import org.apache.ratis.util.Preconditions; +import org.apache.ratis.util.ProtoUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.ByteBuffer; + +/** A standalone server using raft with a configurable state machine. */ +public class FileStoreClient implements Closeable { + public static final Logger LOG = LoggerFactory.getLogger(FileStoreClient.class); + + private final RaftClient client; + + public FileStoreClient(RaftGroup group, RaftProperties properties) + throws IOException { + this.client = RaftClient.newBuilder() + .setProperties(properties) + .setRaftGroup(group) + .build(); + } + + @Override + public void close() throws IOException { + client.close(); + } + + static ByteString send( + ByteString request, CheckedFunction<Message, RaftClientReply, IOException> sendFunction) + throws IOException { + final RaftClientReply reply = sendFunction.apply(() -> request); + if (reply.hasStateMachineException()) { + throw new IOException("Failed to send request " + request, reply.getStateMachineException()); + } + Preconditions.assertTrue(reply.isSuccess(), () -> "reply=" + reply); + return reply.getMessage().getContent(); + } + + private ByteString send(ByteString request) throws IOException { + return send(request, client::send); + } + + private ByteString sendReadOnly(ByteString request) throws IOException { + return send(request, client::sendReadOnly); + } + + public ByteString read(String path, long offset, long length) throws IOException { + return readImpl(path, offset, length).getData(); + } + + private ReadReplyProto readImpl(String path, long offset, long length) throws IOException { + final ReadRequestProto read = ReadRequestProto.newBuilder() + .setPath(ProtoUtils.toByteString(path)) + .setOffset(offset) + .setLength(length) + .build(); + + return ReadReplyProto.parseFrom(sendReadOnly(read.toByteString())); + } + + public long write(String path, long offset, boolean close, ByteBuffer buffer) + throws IOException { + final int chunkSize = FileStoreCommon.getChunkSize(buffer.remaining()); + buffer.limit(chunkSize); + final WriteReplyProto proto = writeImpl(path, offset, close, ByteString.copyFrom(buffer)); + return proto.getLength(); + } + + private WriteReplyProto writeImpl(String path, long offset, boolean close, ByteString data) + throws IOException { + final WriteRequestHeaderProto.Builder header = WriteRequestHeaderProto.newBuilder() + .setPath(ProtoUtils.toByteString(path)) + .setOffset(offset) + .setClose(close); + + final WriteRequestProto.Builder write = WriteRequestProto.newBuilder() + .setHeader(header) + .setData(data); + + final FileStoreRequestProto request = FileStoreRequestProto.newBuilder().setWrite(write).build(); + return WriteReplyProto.parseFrom(send(request.toByteString())); + } + + private DeleteReplyProto deleteImpl(String path) throws IOException { + final DeleteRequestProto.Builder delete = DeleteRequestProto.newBuilder() + .setPath(ProtoUtils.toByteString(path)); + final FileStoreRequestProto request = FileStoreRequestProto.newBuilder().setDelete(delete).build(); + return DeleteReplyProto.parseFrom(send(request.toByteString())); + } + + public void delete(String path) throws IOException { + deleteImpl(path); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8a40ee4b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreCommon.java ---------------------------------------------------------------------- diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreCommon.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreCommon.java new file mode 100644 index 0000000..8a92adf --- /dev/null +++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreCommon.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.examples.filestore; + +import org.apache.ratis.shaded.com.google.protobuf.ByteString; +import org.apache.ratis.util.*; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.concurrent.CompletableFuture; +import java.util.function.Supplier; + +public interface FileStoreCommon { + String STATEMACHINE_DIR_KEY = "example.filestore.statemachine.dir"; + + SizeInBytes MAX_CHUNK_SIZE = SizeInBytes.valueOf(64, TraditionalBinaryPrefix.MEGA); + + static int getChunkSize(long suggestedSize) { + return Math.toIntExact(Math.min(suggestedSize, MAX_CHUNK_SIZE.getSize())); + } + + static ByteString toByteString(Path p) { + return ProtoUtils.toByteString(p.toString()); + } + + static <T> CompletableFuture<T> completeExceptionally( + long index, String message) { + return completeExceptionally(index, message, null); + } + + static <T> CompletableFuture<T> completeExceptionally( + long index, String message, Throwable cause) { + return completeExceptionally(message + ", index=" + index, cause); + } + + static <T> CompletableFuture<T> completeExceptionally( + String message, Throwable cause) { + return JavaUtils.completeExceptionally( + new IOException(message).initCause(cause)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8a40ee4b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java ---------------------------------------------------------------------- diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java new file mode 100644 index 0000000..538c970 --- /dev/null +++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java @@ -0,0 +1,186 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.examples.filestore; + +import org.apache.ratis.conf.ConfUtils; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.protocol.*; +import org.apache.ratis.server.protocol.TermIndex; +import org.apache.ratis.server.storage.RaftStorage; +import org.apache.ratis.shaded.com.google.protobuf.ByteString; +import org.apache.ratis.shaded.com.google.protobuf.InvalidProtocolBufferException; +import org.apache.ratis.shaded.proto.ExamplesProtos.*; +import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; +import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto; +import org.apache.ratis.statemachine.BaseStateMachine; +import org.apache.ratis.statemachine.SimpleStateMachineStorage; +import org.apache.ratis.statemachine.StateMachineStorage; +import org.apache.ratis.statemachine.TransactionContext; +import org.apache.ratis.util.FileUtils; +import org.apache.ratis.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReference; + +public class FileStoreStateMachine extends BaseStateMachine { + public static final Logger LOG = LoggerFactory.getLogger(FileStoreStateMachine.class); + + private final SimpleStateMachineStorage storage = new SimpleStateMachineStorage(); + private final AtomicReference<TermIndex> latestTermIndex = new AtomicReference<>(); + + private final FileStore files; + + public FileStoreStateMachine(RaftProperties properties) { + final File dir = ConfUtils.getFile(properties::getFile, FileStoreCommon.STATEMACHINE_DIR_KEY, null); + Objects.requireNonNull(dir, FileStoreCommon.STATEMACHINE_DIR_KEY + " is not set."); + this.files = new FileStore(this::getId, dir.toPath()); + } + + @Override + public void initialize(RaftPeerId id, RaftProperties properties, RaftStorage raftStorage) + throws IOException { + super.initialize(id, properties, raftStorage); + this.storage.init(raftStorage); + FileUtils.createDirectories(files.getRoot()); + } + + @Override + public StateMachineStorage getStateMachineStorage() { + return storage; + } + + @Override + public void close() { + files.close(); + latestTermIndex.set(null); + } + + @Override + public CompletableFuture<RaftClientReply> query(RaftClientRequest request) { + final ReadRequestProto proto; + try { + proto = ReadRequestProto.parseFrom(request.getMessage().getContent()); + } catch (InvalidProtocolBufferException e) { + return FileStoreCommon.completeExceptionally("Failed to parse " + request, e); + } + + final String path = proto.getPath().toStringUtf8(); + return files.read(path, proto.getOffset(), proto.getLength()) + .thenApply(reply -> new RaftClientReply(request, () -> reply.toByteString())); + } + + @Override + public TransactionContext startTransaction(RaftClientRequest request) throws IOException { + final ByteString content = request.getMessage().getContent(); + final FileStoreRequestProto proto = FileStoreRequestProto.parseFrom(content); + final SMLogEntryProto log; + if (proto.getRequestCase() == FileStoreRequestProto.RequestCase.WRITE) { + final WriteRequestProto write = proto.getWrite(); + final FileStoreRequestProto newProto = FileStoreRequestProto.newBuilder() + .setWriteHeader(write.getHeader()).build(); + log = SMLogEntryProto.newBuilder() + .setData(newProto.toByteString()) + .setStateMachineData(write.getData()) + .build(); + } else { + log = SMLogEntryProto.newBuilder().setData(content).build(); + } + + return new TransactionContext(this, request, log); + } + + @Override + public CompletableFuture<Integer> writeStateMachineData(LogEntryProto entry) { + final SMLogEntryProto smLog = entry.getSmLogEntry(); + final ByteString data = smLog.getData(); + final FileStoreRequestProto proto; + try { + proto = FileStoreRequestProto.parseFrom(data); + } catch (InvalidProtocolBufferException e) { + return FileStoreCommon.completeExceptionally( + entry.getIndex(), "Failed to parse data, entry=" + entry, e); + } + if (proto.getRequestCase() != FileStoreRequestProto.RequestCase.WRITEHEADER) { + return null; + } + + final WriteRequestHeaderProto h = proto.getWriteHeader(); + final CompletableFuture<Integer> f = files.write(entry.getIndex(), + h.getPath().toStringUtf8(), h.getClose(), h.getOffset(), smLog.getStateMachineData()); + // sync only if closing the file + return h.getClose()? f: null; + } + + @Override + public CompletableFuture<Message> applyTransaction(TransactionContext trx) { + final LogEntryProto entry = trx.getLogEntry(); + + final long index = entry.getIndex(); + updateLatestTermIndex(entry.getTerm(), index); + + final SMLogEntryProto smLog = entry.getSmLogEntry(); + final FileStoreRequestProto request; + try { + request = FileStoreRequestProto.parseFrom(smLog.getData()); + } catch (InvalidProtocolBufferException e) { + return FileStoreCommon.completeExceptionally(index, + "Failed to parse SmLogEntry", e); + } + + switch(request.getRequestCase()) { + case DELETE: + return delete(index, request.getDelete()); + case WRITEHEADER: + return writeCommit(index, request.getWriteHeader(), smLog.getStateMachineData().size()); + case WRITE: + // WRITE should not happen here since + // startTransaction converts WRITE requests to WRITEHEADER requests. + default: + LOG.error(getId() + ": Unexpected request case " + request.getRequestCase()); + return FileStoreCommon.completeExceptionally(index, + "Unexpected request case " + request.getRequestCase()); + } + } + + private CompletableFuture<Message> writeCommit( + long index, WriteRequestHeaderProto header, int size) { + final String path = header.getPath().toStringUtf8(); + return files.submitCommit(index, path, header.getClose(), header.getOffset(), size) + .thenApply(reply -> () -> reply.toByteString()); + } + + private CompletableFuture<Message> delete(long index, DeleteRequestProto request) { + final String path = request.getPath().toStringUtf8(); + return files.delete(index, path).thenApply(resolved -> () -> + DeleteReplyProto.newBuilder().setResolvedPath( + FileStoreCommon.toByteString(resolved)).build().toByteString()); + } + + private void updateLatestTermIndex(long term, long index) { + final TermIndex newTI = TermIndex.newTermIndex(term, index); + final TermIndex oldTI = latestTermIndex.getAndSet(newTI); + if (oldTI != null) { + Preconditions.assertTrue(newTI.compareTo(oldTI) >= 0); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8a40ee4b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreBaseTest.java ---------------------------------------------------------------------- diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreBaseTest.java b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreBaseTest.java new file mode 100644 index 0000000..8991b0d --- /dev/null +++ b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreBaseTest.java @@ -0,0 +1,201 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.examples.filestore; + +import org.apache.ratis.BaseTest; +import org.apache.ratis.MiniRaftCluster; +import org.apache.ratis.RaftTestUtil; +import org.apache.ratis.conf.ConfUtils; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.shaded.com.google.protobuf.ByteString; +import org.apache.ratis.shaded.io.netty.util.internal.ThreadLocalRandom; +import org.apache.ratis.statemachine.StateMachine; +import org.apache.ratis.util.*; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +public abstract class FileStoreBaseTest<CLUSTER extends MiniRaftCluster> + extends BaseTest + implements MiniRaftCluster.Factory.Get<CLUSTER> { + public static final Logger LOG = LoggerFactory.getLogger(FileStoreBaseTest.class); + + { + final RaftProperties p = getProperties(); + p.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, + FileStoreStateMachine.class, StateMachine.class); + ConfUtils.setFile(p::setFile, FileStoreCommon.STATEMACHINE_DIR_KEY, + new File(BaseTest.getRootTestDir(), "filestore")); + } + + static final int NUM_PEERS = 3; + + @Test + public void testFileStore() throws Exception { + final CLUSTER cluster = newCluster(NUM_PEERS); + cluster.start(); + RaftTestUtil.waitForLeader(cluster); + + final CheckedSupplier<FileStoreClient, IOException> newClient = + () -> new FileStoreClient(cluster.getGroup(), getProperties()); + + testSingleFile("foo", SizeInBytes.valueOf("10M"), newClient); + testMultipleFiles("file", 100, SizeInBytes.valueOf("1M"), newClient); + + cluster.shutdown(); + } + + private static void testSingleFile( + String path, SizeInBytes fileLength, CheckedSupplier<FileStoreClient, IOException> newClient) + throws Exception { + LOG.info("runTestSingleFile with path={}, fileLength={}", path, fileLength); + + try (final Writer w = new Writer(path, fileLength, newClient)) { + w.write().verify().delete(); + } + } + + private static void testMultipleFiles( + String pathPrefix, int numFile, SizeInBytes fileLength, + CheckedSupplier<FileStoreClient, IOException> newClient) throws Exception { + LOG.info("runTestMultipleFile with pathPrefix={}, numFile={}, fileLength={}", + pathPrefix, numFile, fileLength); + + final ExecutorService executor = Executors.newFixedThreadPool(20); + + final List<Future<Writer>> writerFutures = new ArrayList<>(); + for (int i = 0; i < numFile; i++) { + final String path = String.format("%s%02d", pathPrefix, i); + final Callable<Writer> callable = LogUtils.newCallable(LOG, + () -> new Writer(path, fileLength, newClient).write(), + () -> path + ":" + fileLength); + writerFutures.add(executor.submit(callable)); + } + + final List<Writer> writers = new ArrayList<>(); + for(Future<Writer> f : writerFutures) { + writers.add(f.get()); + } + + writerFutures.clear(); + for (Writer w : writers) { + writerFutures.add(executor.submit(() -> w.verify().delete())); + } + for(Future<Writer> f : writerFutures) { + f.get().close(); + } + + executor.shutdown(); + } + + static class Writer implements Closeable { + final long seed = ThreadLocalRandom.current().nextLong(); + final byte[] buffer = new byte[4 << 10]; + + final String fileName; + final SizeInBytes fileSize; + final FileStoreClient client; + + Writer(String fileName, SizeInBytes fileSize, CheckedSupplier<FileStoreClient, IOException> newClient) + throws IOException { + this.fileName = fileName; + this.fileSize = fileSize; + this.client = newClient.get(); + } + + ByteBuffer randomBytes(int length, Random random) { + Preconditions.assertTrue(length <= buffer.length); + random.nextBytes(buffer); + final ByteBuffer b = ByteBuffer.wrap(buffer); + b.limit(length); + return b; + } + + Writer write() throws IOException { + final Random r = new Random(seed); + final int size = fileSize.getSizeInt(); + + for(int offset = 0; offset < size; ) { + final int remaining = size - offset; + final int n = Math.min(remaining, buffer.length); + final boolean close = n == remaining; + + final ByteBuffer b = randomBytes(n, r); + + LOG.trace("client write {}, offset={}", fileName, offset); + final long written = client.write(fileName, offset, close, b); + Assert.assertEquals(n, written); + offset += written; + } + return this; + } + + Writer verify() throws IOException { + final Random r = new Random(seed); + final int size = fileSize.getSizeInt(); + + for(int offset = 0; offset < size; ) { + final int remaining = size - offset; + final int n = Math.min(remaining, buffer.length); + + final ByteString read = client.read(fileName, offset, n); + Assert.assertEquals(n, read.size()); + + final ByteBuffer b = randomBytes(n, r); + + assertBuffers(offset, n, b, read.asReadOnlyByteBuffer()); + offset += n; + } + return this; + } + + Writer delete() throws IOException { + client.delete(fileName); + return this; + } + + @Override + public void close() throws IOException { + client.close(); + } + } + + static void assertBuffers(int offset, int length, ByteBuffer expected, ByteBuffer computed) { + try { + Assert.assertEquals(expected, computed); + } catch(AssertionError e) { + LOG.error("Buffer mismatched at offset=" + offset + ", length=" + length + + "expected = " + StringUtils.bytes2HexString(expected) + "\n" + + "computed = " + StringUtils.bytes2HexString(computed) + "\n", e); + throw e; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8a40ee4b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreWithGrpc.java ---------------------------------------------------------------------- diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreWithGrpc.java b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreWithGrpc.java new file mode 100644 index 0000000..71ae294 --- /dev/null +++ b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreWithGrpc.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.examples.filestore; + +import org.apache.ratis.grpc.MiniRaftClusterWithGRpc; + +public class TestFileStoreWithGrpc + extends FileStoreBaseTest<MiniRaftClusterWithGRpc> + implements MiniRaftClusterWithGRpc.FactoryGet { +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8a40ee4b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreWithNetty.java ---------------------------------------------------------------------- diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreWithNetty.java b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreWithNetty.java new file mode 100644 index 0000000..9b38e21 --- /dev/null +++ b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreWithNetty.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.examples.filestore; + +import org.apache.ratis.netty.MiniRaftClusterWithNetty; + +public class TestFileStoreWithNetty + extends FileStoreBaseTest<MiniRaftClusterWithNetty> + implements MiniRaftClusterWithNetty.FactoryGet { +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8a40ee4b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java ---------------------------------------------------------------------- diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java index cb337b5..ebe3184 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java @@ -21,7 +21,6 @@ import org.apache.ratis.client.impl.ClientProtoUtils; import org.apache.ratis.netty.NettyConfigKeys; import org.apache.ratis.netty.NettyRpcProxy; import org.apache.ratis.protocol.RaftClientReply; -import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.protocol.ServerInformationReply; import org.apache.ratis.rpc.SupportedRpcType; @@ -44,15 +43,12 @@ import org.apache.ratis.shaded.proto.netty.NettyProtos.RaftNettyExceptionReplyPr import org.apache.ratis.shaded.proto.netty.NettyProtos.RaftNettyServerReplyProto; import org.apache.ratis.shaded.proto.netty.NettyProtos.RaftNettyServerRequestProto; import org.apache.ratis.util.CodeInjectionForTesting; -import org.apache.ratis.util.JavaUtils; -import org.apache.ratis.util.LifeCycle; import org.apache.ratis.util.ProtoUtils; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.ClosedChannelException; import java.util.Objects; -import java.util.function.Supplier; /** * A netty server endpoint that acts as the communication layer. @@ -237,7 +233,7 @@ public final class NettyRpcService extends RaftServerRpcWithProxy<NettyRpcProxy, .setSuccess(false); final RaftNettyExceptionReplyProto.Builder ioe = RaftNettyExceptionReplyProto.newBuilder() .setRpcReply(rpcReply) - .setException(ProtoUtils.toByteString(e)); + .setException(ProtoUtils.writeObject2ByteString(e)); return RaftNettyServerReplyProto.newBuilder().setExceptionReply(ioe).build(); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8a40ee4b/ratis-proto-shaded/src/main/proto/Examples.proto ---------------------------------------------------------------------- diff --git a/ratis-proto-shaded/src/main/proto/Examples.proto b/ratis-proto-shaded/src/main/proto/Examples.proto new file mode 100644 index 0000000..6efef5b --- /dev/null +++ b/ratis-proto-shaded/src/main/proto/Examples.proto @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +syntax = "proto3"; +option java_package = "org.apache.ratis.shaded.proto"; +option java_outer_classname = "ExamplesProtos"; +option java_generate_equals_and_hash = true; +package ratis.example; + +message FileStoreRequestProto { + oneof Request { + WriteRequestHeaderProto writeHeader = 1; + WriteRequestProto write = 2; + DeleteRequestProto delete = 3; + } +} + +message ReadRequestProto { + bytes path = 1; + uint64 offset = 2; + uint64 length = 3; +} + +message WriteRequestHeaderProto { + bytes path = 1; + bool close = 2; // close the file after write? + uint64 offset = 3; +} + +message WriteRequestProto { + WriteRequestHeaderProto header = 1; + bytes data = 2; +} + +message DeleteRequestProto { + bytes path = 1; +} + +message ReadReplyProto { + bytes resolvedPath = 1; + uint64 offset = 2; + bytes data = 3; // returned data size may be smaller than the requested size +} + +message WriteReplyProto { + bytes resolvedPath = 1; + uint64 offset = 2; + uint64 length = 3; // bytes actually written +} + +message DeleteReplyProto { + bytes resolvedPath = 1; +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8a40ee4b/ratis-proto-shaded/src/main/proto/Raft.proto ---------------------------------------------------------------------- diff --git a/ratis-proto-shaded/src/main/proto/Raft.proto b/ratis-proto-shaded/src/main/proto/Raft.proto index 52a1761..a8284fb 100644 --- a/ratis-proto-shaded/src/main/proto/Raft.proto +++ b/ratis-proto-shaded/src/main/proto/Raft.proto @@ -44,6 +44,8 @@ message SMLogEntryProto { // TODO: This is not super efficient if the SM itself uses PB to serialize its own data for a // log entry. Data will be copied twice. We should directly support having any Message from SM bytes data = 1; + + bytes stateMachineData = 2; // State machine specific data which is not written to log. } message LeaderNoOp { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8a40ee4b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index f761a24..122ff51 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -136,7 +136,7 @@ public class RaftServerImpl implements RaftServerProtocol, return groupId; } - StateMachine getStateMachine() { + public StateMachine getStateMachine() { return proxy.getStateMachine(); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8a40ee4b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java index e80ca02..23d7c9a 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java @@ -27,18 +27,14 @@ import org.apache.ratis.server.storage.RaftLogCache.SegmentFileInfo; import org.apache.ratis.server.storage.RaftLogCache.TruncationSegments; import org.apache.ratis.server.storage.SegmentedRaftLog.Task; import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; -import org.apache.ratis.util.ExitUtils; -import org.apache.ratis.util.FileUtils; -import org.apache.ratis.util.IOUtils; -import org.apache.ratis.util.Preconditions; +import org.apache.ratis.statemachine.StateMachine; +import org.apache.ratis.util.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; /** * This class takes the responsibility of all the raft log related I/O ops for a @@ -58,6 +54,7 @@ class RaftLogWorker implements Runnable { private final RaftStorage storage; private volatile LogOutputStream out; private final RaftServerImpl raftServer; + private final StateMachine stateMachine; /** * The number of entries that have been written into the LogOutputStream but @@ -81,6 +78,7 @@ class RaftLogWorker implements Runnable { LOG.info("new {} for {}", name, storage); this.raftServer = raftServer; + this.stateMachine = raftServer != null? raftServer.getStateMachine(): null; this.storage = storage; this.segmentMaxSize = @@ -251,9 +249,32 @@ class RaftLogWorker implements Runnable { private class WriteLog extends Task { private final LogEntryProto entry; + private final CompletableFuture<?> stateMachineFuture; WriteLog(LogEntryProto entry) { - this.entry = entry; + this.entry = ProtoUtils.removeStateMachineData(entry); + if (this.entry == entry || stateMachine == null) { + this.stateMachineFuture = null; + } else { + // this.entry != entry iff the entry has state machine data + this.stateMachineFuture = stateMachine.writeStateMachineData(entry); + } + } + + @Override + void waitForDone() throws InterruptedException { + super.waitForDone(); + // TODO: It does not work since logSync only wait for the last task L. + // TODO: If some task T earlier than L has a writeStateMachineData future, it will not be sync'ed. + // TODO: Need RATIS-124 + + if (stateMachineFuture != null) { + try { + stateMachineFuture.get(); + } catch (ExecutionException e) { + // ignore + } + } } @Override http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8a40ee4b/ratis-server/src/main/java/org/apache/ratis/statemachine/BaseStateMachine.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/BaseStateMachine.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/BaseStateMachine.java index d35dc88..dbf19b0 100644 --- a/ratis-server/src/main/java/org/apache/ratis/statemachine/BaseStateMachine.java +++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/BaseStateMachine.java @@ -38,11 +38,16 @@ import org.apache.ratis.util.LifeCycle; */ public class BaseStateMachine implements StateMachine { + private volatile RaftPeerId id; protected RaftProperties properties; protected RaftStorage storage; protected RaftConfiguration raftConf; protected final LifeCycle lifeCycle = new LifeCycle(getClass().getSimpleName()); + public RaftPeerId getId() { + return id; + } + @Override public LifeCycle.State getLifeCycleState() { return lifeCycle.getCurrentState(); @@ -51,6 +56,7 @@ public class BaseStateMachine implements StateMachine { @Override public void initialize(RaftPeerId id, RaftProperties properties, RaftStorage storage) throws IOException { + this.id = id; lifeCycle.setName(getClass().getSimpleName() + ":" + id); this.properties = properties; this.storage = storage; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8a40ee4b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java index 17990cd..ae26cb1 100644 --- a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java +++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java @@ -24,7 +24,9 @@ import org.apache.ratis.protocol.RaftClientRequest; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.impl.RaftConfiguration; +import org.apache.ratis.server.storage.RaftLog; import org.apache.ratis.server.storage.RaftStorage; +import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; import org.apache.ratis.util.LifeCycle; import java.io.Closeable; @@ -124,6 +126,17 @@ public interface StateMachine extends Closeable { throws IOException; /** + * Write asynchronously the state machine data to this state machine. + * + * @return a future for the write task + * if {@link RaftLog#logSync()} should also sync writing the state machine data; + * otherwise, return null. + */ + default CompletableFuture<?> writeStateMachineData(LogEntryProto entry) { + return null; + } + + /** * This is called before the transaction passed from the StateMachine is appended to the raft log. * This method will be called from log append and having the same strict serial order that the * transactions will have in the RAFT log. Since this is called serially in the critical path of http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8a40ee4b/ratis-server/src/main/java/org/apache/ratis/statemachine/TransactionContext.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/TransactionContext.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/TransactionContext.java index 3a6adfc..df9a4fe 100644 --- a/ratis-server/src/main/java/org/apache/ratis/statemachine/TransactionContext.java +++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/TransactionContext.java @@ -19,11 +19,13 @@ package org.apache.ratis.statemachine; import java.io.IOException; import java.util.Collection; -import java.util.Optional; +import java.util.Objects; import org.apache.ratis.protocol.RaftClientRequest; import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; +import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase; import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto; +import org.apache.ratis.util.Preconditions; /** * Context for a transaction. @@ -129,8 +131,8 @@ public class TransactionContext { */ public TransactionContext(StateMachine stateMachine, LogEntryProto logEntry) { this(stateMachine); + setLogEntry(logEntry); this.smLogEntryProto = logEntry.getSmLogEntry(); - this.logEntry = logEntry; } public RaftClientRequest getClientRequest() { @@ -155,6 +157,11 @@ public class TransactionContext { } public TransactionContext setLogEntry(LogEntryProto logEntry) { + Objects.requireNonNull(logEntry, "logEntry == null"); + Preconditions.assertTrue(logEntry.getLogEntryBodyCase() == LogEntryBodyCase.SMLOGENTRY, + () -> "LogEntryBodyCase = " + logEntry.getLogEntryBodyCase() + + " != " + LogEntryBodyCase.SMLOGENTRY + ", logEntry=" + logEntry); + Preconditions.assertTrue(this.logEntry == null, "this.logEntry != null"); this.logEntry = logEntry; return this; } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8a40ee4b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java index a7a9412..019e8cd 100644 --- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java +++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java @@ -43,6 +43,7 @@ import java.net.InetSocketAddress; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.stream.StreamSupport; @@ -56,7 +57,17 @@ public abstract class MiniRaftCluster { public static abstract class Factory<CLUSTER extends MiniRaftCluster> { public interface Get<CLUSTER extends MiniRaftCluster> { + Supplier<RaftProperties> properties = JavaUtils.memoize(() -> new RaftProperties()); + Factory<CLUSTER> getFactory(); + + default RaftProperties getProperties() { + return properties.get(); + } + + default CLUSTER newCluster(int numPeers) throws IOException { + return getFactory().newCluster(numPeers, getProperties()); + } } public abstract CLUSTER newCluster( @@ -216,7 +227,21 @@ public abstract class MiniRaftCluster { STATEMACHINE_CLASS_KEY, STATEMACHINE_CLASS_DEFAULT, StateMachine.class); - return ReflectionUtils.newInstance(smClass); + + final RuntimeException exception; + try { + return ReflectionUtils.newInstance(smClass); + } catch(RuntimeException e) { + exception = e; + } + + try { + final Class<?>[] argClasses = {RaftProperties.class}; + return ReflectionUtils.newInstance(smClass, argClasses, properties); + } catch(RuntimeException e) { + exception.addSuppressed(e); + } + throw exception; } public static List<RaftPeer> toRaftPeers( http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8a40ee4b/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java index 3877083..e5439d6 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java @@ -50,7 +50,7 @@ public abstract class RaftExceptionBaseTest<CLUSTER extends MiniRaftCluster> @Before public void setup() throws IOException { - cluster = getFactory().newCluster(NUM_PEERS, new RaftProperties()); + cluster = newCluster(NUM_PEERS); cluster.start(); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8a40ee4b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java index 4fe9edc..927ad88 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java @@ -33,12 +33,12 @@ import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto; import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.Preconditions; +import org.apache.ratis.util.ProtoUtils; import org.junit.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Collection; import java.util.Objects; @@ -162,10 +162,6 @@ public interface RaftTestUtil { } } - static ByteString toByteString(String string) { - return ByteString.copyFrom(string, StandardCharsets.UTF_8); - } - class SimpleMessage implements Message { public static SimpleMessage[] create(int numMessages) { return create(numMessages, "m"); @@ -180,9 +176,11 @@ public interface RaftTestUtil { } final String messageId; + final ByteString bytes; public SimpleMessage(final String messageId) { this.messageId = messageId; + this.bytes = ProtoUtils.toByteString(messageId); } @Override @@ -209,15 +207,18 @@ public interface RaftTestUtil { @Override public ByteString getContent() { - return toByteString(messageId); + return bytes; } } class SimpleOperation { private final String op; + private final SMLogEntryProto smLogEntryProto; public SimpleOperation(String op) { this.op = Objects.requireNonNull(op); + this.smLogEntryProto = SMLogEntryProto.newBuilder() + .setData(ProtoUtils.toByteString(op)).build(); } @Override @@ -238,7 +239,7 @@ public interface RaftTestUtil { } public SMLogEntryProto getLogEntryContent() { - return SMLogEntryProto.newBuilder().setData(toByteString(op)).build(); + return smLogEntryProto; } }
