Repository: incubator-ratis Updated Branches: refs/heads/master 27a8cbb49 -> bef9a72e3
RATIS-447. LogAppender should times out if readStateMachineData takes a long time. Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/bef9a72e Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/bef9a72e Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/bef9a72e Branch: refs/heads/master Commit: bef9a72e32ee93adf3d532c2397e01e098afff59 Parents: 27a8cbb Author: Tsz Wo Nicholas Sze <[email protected]> Authored: Thu Dec 6 10:38:38 2018 -0800 Committer: Tsz Wo Nicholas Sze <[email protected]> Committed: Thu Dec 6 10:38:38 2018 -0800 ---------------------------------------------------------------------- .../org/apache/ratis/util/CollectionUtils.java | 8 + .../java/org/apache/ratis/util/DataQueue.java | 142 +++++++++++++++ .../org/apache/ratis/util/TimeDuration.java | 24 ++- .../java/org/apache/ratis/util/Timestamp.java | 19 +- .../function/CheckedFunctionWithTimeout.java | 32 ++++ .../apache/ratis/util/function/TriConsumer.java | 28 +++ .../java/org/apache/ratis/TestBatchAppend.java | 169 ------------------ .../apache/ratis/grpc/server/GrpcService.java | 4 +- .../hadooprpc/TestLogAppenderWithHadoopRpc.java | 25 +++ .../ratis/server/RaftServerConfigKeys.java | 28 +-- .../apache/ratis/server/impl/LogAppender.java | 98 +++-------- .../apache/ratis/server/impl/ServerState.java | 2 +- .../apache/ratis/server/storage/RaftLog.java | 20 ++- .../ratis/server/storage/SegmentedRaftLog.java | 2 +- .../java/org/apache/ratis/LogAppenderTests.java | 151 ++++++++++++++++ .../java/org/apache/ratis/RaftBasicTests.java | 1 - .../org/apache/ratis/RaftExceptionBaseTest.java | 2 +- .../SimpleStateMachine4Testing.java | 4 +- .../ratis/grpc/TestLogAppenderWithGrpc.java | 25 +++ .../ratis/netty/TestLogAppenderWithNetty.java | 25 +++ .../TestLogAppenderWithSimulatedRpc.java | 25 +++ .../org/apache/ratis/util/TestDataQueue.java | 172 +++++++++++++++++++ .../org/apache/ratis/util/TestTimeDuration.java | 65 ++++++- 23 files changed, 800 insertions(+), 271 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bef9a72e/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java index 57222a6..cb49847 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java @@ -28,6 +28,14 @@ import java.util.stream.Collectors; import java.util.stream.StreamSupport; public interface CollectionUtils { + static <T> T min(T left, T right, Comparator<T> comparator) { + return comparator.compare(left, right) < 0? left: right; + } + + static <T extends Comparable<T>> T min(T left, T right) { + return min(left, right, Comparator.naturalOrder()); + } + /** * @return the next element in the iteration right after the given element; * if the given element is not in the iteration, return the first one http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bef9a72e/ratis-common/src/main/java/org/apache/ratis/util/DataQueue.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/DataQueue.java b/ratis-common/src/main/java/org/apache/ratis/util/DataQueue.java new file mode 100644 index 0000000..d7819cf --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/util/DataQueue.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.util; + +import org.apache.ratis.util.function.CheckedFunctionWithTimeout; +import org.apache.ratis.util.function.TriConsumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Queue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.ToIntFunction; + +/** + * A queue for data elements + * such that the queue imposes limits on both number of elements and the data size in bytes. + * + * Null element is NOT supported. + * + * This class is NOT threadsafe. + */ +public class DataQueue<E> { + public static final Logger LOG = LoggerFactory.getLogger(DataQueue.class); + + private final Object name; + private final int byteLimit; + private final int elementLimit; + private final ToIntFunction<E> getNumBytes; + + private final Queue<E> q; + + private int numBytes = 0; + + public DataQueue(Object name, SizeInBytes byteLimit, int elementLimit, ToIntFunction<E> getNumBytes) { + this.name = name != null? name: this; + this.byteLimit = byteLimit.getSizeInt(); + this.elementLimit = elementLimit; + this.getNumBytes = getNumBytes; + this.q = new ArrayDeque<>(elementLimit); + } + + public int getNumBytes() { + return numBytes; + } + + public int getNumElements() { + return q.size(); + } + + public final boolean isEmpty() { + return getNumElements() == 0; + } + + public void clear() { + q.clear(); + numBytes = 0; + } + + /** + * Adds an element to this queue. + * + * @return true if the element is added successfully; + * otherwise, the element is not added, return false. + */ + public boolean offer(E element) { + Objects.requireNonNull(element, "element == null"); + if (elementLimit > 0 && q.size() >= elementLimit) { + return false; + } + final int elementNumBytes = getNumBytes.applyAsInt(element); + Preconditions.assertTrue(elementNumBytes >= 0, + () -> name + ": elementNumBytes = " + elementNumBytes + " < 0"); + if (byteLimit > 0) { + Preconditions.assertTrue(elementNumBytes <= byteLimit, + () -> name + ": elementNumBytes = " + elementNumBytes + " > byteLimit = " + byteLimit); + if (numBytes > byteLimit - elementNumBytes) { + return false; + } + } + q.offer(element); + numBytes += elementNumBytes; + return true; + } + + /** Poll a list of the results within the given timeout. */ + public <RESULT, THROWABLE extends Throwable> List<RESULT> pollList(long timeoutMs, + CheckedFunctionWithTimeout<E, RESULT, THROWABLE> getResult, + TriConsumer<E, TimeDuration, TimeoutException> timeoutHandler) throws THROWABLE { + if (timeoutMs <= 0 || q.isEmpty()) { + return Collections.emptyList(); + } + + final Timestamp startTime = Timestamp.currentTime(); + final TimeDuration limit = TimeDuration.valueOf(timeoutMs, TimeUnit.MILLISECONDS); + for(final List<RESULT> results = new ArrayList<>();;) { + final E peeked = q.peek(); + if (peeked == null) { // q is empty + return results; + } + + final TimeDuration remaining = limit.minus(startTime.elapsedTime()); + try { + results.add(getResult.apply(peeked, remaining)); + } catch (TimeoutException e) { + Optional.ofNullable(timeoutHandler).ifPresent(h -> h.accept(peeked, remaining, e)); + return results; + } + + final E polled = poll(); + Preconditions.assertTrue(polled == peeked); + } + } + + /** Poll out the head element from this queue. */ + public E poll() { + final E polled = q.poll(); + numBytes -= getNumBytes.applyAsInt(polled); + return polled; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bef9a72e/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java b/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java index 7daa4dd..41ba1c6 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java @@ -118,16 +118,38 @@ public class TimeDuration implements Comparable<TimeDuration> { return unit; } + /** + * Convert this {@link TimeDuration} to a long in the target unit. + * Note that the returned value may be truncated or saturated; see {@link TimeUnit#convert(long, TimeUnit)}.* + * + * @return the value in the target unit. + */ public long toLong(TimeUnit targetUnit) { return targetUnit.convert(duration, unit); } + /** + * The same as Math.toIntExact(toLong(targetUnit)); + * Similar to {@link #toLong(TimeUnit)}, the returned value may be truncated. + * However, the returned value is never saturated. The method throws {@link ArithmeticException} if it overflows. + * + * @return the value in the target unit. + * @throws ArithmeticException if it overflows. + */ public int toInt(TimeUnit targetUnit) { return Math.toIntExact(toLong(targetUnit)); } + /** @return the {@link TimeDuration} in the target unit. */ public TimeDuration to(TimeUnit targetUnit) { - return valueOf(toLong(targetUnit), targetUnit); + return this.unit == targetUnit? this: valueOf(toLong(targetUnit), targetUnit); + } + + /** @return (this - that) in the minimum unit among them. */ + public TimeDuration minus(TimeDuration that) { + Objects.requireNonNull(that, "that == null"); + final TimeUnit minUnit = CollectionUtils.min(this.unit, that.unit); + return valueOf(this.toLong(minUnit) - that.toLong(minUnit), minUnit); } /** Round up to the given nanos to nearest multiple (in nanoseconds) of this {@link TimeDuration}. */ http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bef9a72e/ratis-common/src/main/java/org/apache/ratis/util/Timestamp.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/Timestamp.java b/ratis-common/src/main/java/org/apache/ratis/util/Timestamp.java index 8ab3f6b..c33a864 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/Timestamp.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/Timestamp.java @@ -17,6 +17,8 @@ */ package org.apache.ratis.util; +import java.util.concurrent.TimeUnit; + /** * Use {@link System#nanoTime()} as timestamps. * @@ -39,6 +41,11 @@ public class Timestamp implements Comparable<Timestamp> { return System.nanoTime(); } + /** @return a {@link Timestamp} for the current time. */ + public static Timestamp currentTime() { + return valueOf(currentTimeNanos()); + } + /** @return the latest timestamp. */ public static Timestamp latest(Timestamp a, Timestamp b) { return a.compareTo(b) > 0? a: b; @@ -66,8 +73,7 @@ public class Timestamp implements Comparable<Timestamp> { /** * @return the elapsed time in milliseconds. - * If the timestamp is a future time, - * this method returns a negative value. + * If the timestamp is a future time, the returned value is negative. */ public long elapsedTimeMs() { final long d = System.nanoTime() - nanos; @@ -75,6 +81,15 @@ public class Timestamp implements Comparable<Timestamp> { } /** + * @return the elapsed time in nanoseconds. + * If the timestamp is a future time, the returned value is negative. + */ + public TimeDuration elapsedTime() { + final long d = System.nanoTime() - nanos; + return TimeDuration.valueOf(d, TimeUnit.NANOSECONDS); + } + + /** * Compare two timestamps, t0 (this) and t1 (that). * This method uses {@code t0 - t1 < 0}, not {@code t0 < t1}, * in order to take care the possibility of numerical overflow. http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bef9a72e/ratis-common/src/main/java/org/apache/ratis/util/function/CheckedFunctionWithTimeout.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/function/CheckedFunctionWithTimeout.java b/ratis-common/src/main/java/org/apache/ratis/util/function/CheckedFunctionWithTimeout.java new file mode 100644 index 0000000..fddfab2 --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/util/function/CheckedFunctionWithTimeout.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.util.function; + +import org.apache.ratis.util.TimeDuration; + +import java.util.concurrent.TimeoutException; + +/** Function with a timeout and a throws-clause. */ +@FunctionalInterface +public interface CheckedFunctionWithTimeout<INPUT, OUTPUT, THROWABLE extends Throwable> { + /** + * The same as {@link org.apache.ratis.util.CheckedFunction#apply(Object)} + * except that this method has a timeout parameter and throws {@link TimeoutException}. + */ + OUTPUT apply(INPUT input, TimeDuration timeout) throws TimeoutException, THROWABLE; +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bef9a72e/ratis-common/src/main/java/org/apache/ratis/util/function/TriConsumer.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/function/TriConsumer.java b/ratis-common/src/main/java/org/apache/ratis/util/function/TriConsumer.java new file mode 100644 index 0000000..a3cd283 --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/util/function/TriConsumer.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.util.function; + +/** Consumer with three input parameters. */ +@FunctionalInterface +public interface TriConsumer<T, U, V> { + /** + * The same as {@link java.util.function.BiConsumer#accept(Object, Object)}} + * except that this method is declared with three parameters. + */ + void accept(T t, U u, V v); +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bef9a72e/ratis-examples/src/test/java/org/apache/ratis/TestBatchAppend.java ---------------------------------------------------------------------- diff --git a/ratis-examples/src/test/java/org/apache/ratis/TestBatchAppend.java b/ratis-examples/src/test/java/org/apache/ratis/TestBatchAppend.java deleted file mode 100644 index 7233e8f..0000000 --- a/ratis-examples/src/test/java/org/apache/ratis/TestBatchAppend.java +++ /dev/null @@ -1,169 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ratis; - -import org.apache.log4j.Level; -import org.apache.ratis.RaftTestUtil.SimpleMessage; -import org.apache.ratis.client.RaftClient; -import org.apache.ratis.conf.RaftProperties; -import org.apache.ratis.examples.ParameterizedBaseTest; -import org.apache.ratis.proto.RaftProtos.LogEntryProto; -import org.apache.ratis.proto.RaftProtos.LogEntryProto.LogEntryBodyCase; -import org.apache.ratis.protocol.RaftPeerId; -import org.apache.ratis.server.RaftServerConfigKeys; -import org.apache.ratis.server.impl.RaftServerImpl; -import org.apache.ratis.server.impl.ServerState; -import org.apache.ratis.server.storage.RaftLog; -import org.apache.ratis.statemachine.SimpleStateMachine4Testing; -import org.apache.ratis.statemachine.StateMachine; -import org.apache.ratis.util.LogUtils; -import org.apache.ratis.util.SizeInBytes; -import org.junit.After; -import org.junit.Assert; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.io.IOException; -import java.util.Arrays; -import java.util.Collection; -import java.util.EnumMap; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -/** - * Enable raft.server.log.appender.batch.enabled and test LogAppender - */ -@RunWith(Parameterized.class) -public class TestBatchAppend extends BaseTest { - static { - LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); - LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG); - } - - @Parameterized.Parameters - public static Collection<Object[]> data() throws IOException { - RaftProperties prop = new RaftProperties(); - prop.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, - SimpleStateMachine4Testing.class, StateMachine.class); - RaftServerConfigKeys.Log.setSegmentSizeMax(prop, SizeInBytes.valueOf("8KB")); - // enable batch appending - RaftServerConfigKeys.Log.Appender.setBatchEnabled(prop, true); - // set batch appending buffer size to 4KB - RaftServerConfigKeys.Log.Appender.setBufferCapacity(prop, SizeInBytes.valueOf("8KB")); - - return ParameterizedBaseTest.getMiniRaftClusters(prop, 3); - } - - @Parameterized.Parameter - public MiniRaftCluster cluster; - - @After - public void tearDown() { - if (cluster != null) { - cluster.shutdown(); - } - } - - private class Sender extends Thread { - private final RaftClient client; - private final CountDownLatch latch; - private final SimpleMessage[] msgs; - private final AtomicBoolean succeed = new AtomicBoolean(false); - - Sender(RaftPeerId leaderId, CountDownLatch latch, int numMsg) { - this.latch = latch; - this.client = cluster.createClient(leaderId); - msgs = generateMsgs(numMsg); - } - - SimpleMessage[] generateMsgs(int num) { - SimpleMessage[] msgs = new SimpleMessage[num * 6]; - for (int i = 0; i < num; i++) { - for (int j = 0; j < 6; j++) { - byte[] bytes = new byte[1024 * (j + 1)]; - Arrays.fill(bytes, (byte) (j + '0')); - msgs[i * 6 + j] = new SimpleMessage(new String(bytes)); - } - } - return msgs; - } - - @Override - public void run() { - try { - latch.await(); - } catch (InterruptedException ignored) { - LOG.warn("Client {} waiting for countdown latch got interrupted", - client.getId()); - } - for (SimpleMessage msg : msgs) { - try { - client.send(msg); - } catch (IOException e) { - succeed.set(false); - LOG.warn("Client {} hit exception {}", client.getId(), e); - return; - } - } - succeed.set(true); - try { - client.close(); - } catch (IOException ignore) { - } - } - } - - @Test - public void testAppend() throws Exception { - final int numMsgs = 10; - final int numClients = 5; - cluster.start(); - RaftTestUtil.waitForLeader(cluster); - final RaftPeerId leaderId = cluster.getLeader().getId(); - - // start several clients and write concurrently - CountDownLatch latch = new CountDownLatch(1); - final List<Sender> senders = Stream.iterate(0, i -> i+1).limit(numClients) - .map(i -> new Sender(leaderId, latch, numMsgs)) - .collect(Collectors.toList()); - senders.forEach(Thread::start); - - latch.countDown(); - - for (Sender s : senders) { - s.join(); - Assert.assertTrue(s.succeed.get()); - } - - final ServerState leaderState = cluster.getLeader().getState(); - final RaftLog leaderLog = leaderState.getLog(); - final EnumMap<LogEntryBodyCase, AtomicLong> counts = RaftTestUtil.countEntries(leaderLog); - LOG.info("counts = " + counts); - Assert.assertEquals(6 * numMsgs * numClients, counts.get(LogEntryBodyCase.STATEMACHINELOGENTRY).get()); - - final LogEntryProto lastStateMachineEntry = RaftTestUtil.getLastEntry(LogEntryBodyCase.STATEMACHINELOGENTRY, leaderLog); - LOG.info("lastStateMachineEntry = " + lastStateMachineEntry); - Assert.assertTrue(lastStateMachineEntry.getIndex() <= leaderState.getLastAppliedIndex()); - - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bef9a72e/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java index 9c94cca..4bd370f 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java @@ -67,7 +67,7 @@ public class GrpcService extends RaftServerRpcWithProxy<GrpcServerProtocolClient this(server, server::getId, GrpcConfigKeys.Server.port(server.getProperties()), GrpcConfigKeys.messageSizeMax(server.getProperties(), LOG::info), - RaftServerConfigKeys.Log.Appender.bufferCapacity(server.getProperties()), + RaftServerConfigKeys.Log.Appender.bufferByteLimit(server.getProperties()), GrpcConfigKeys.flowControlWindow(server.getProperties(), LOG::info), RaftServerConfigKeys.Rpc.requestTimeout(server.getProperties())); } @@ -78,7 +78,7 @@ public class GrpcService extends RaftServerRpcWithProxy<GrpcServerProtocolClient p -> new GrpcServerProtocolClient(p, flowControlWindow.getSizeInt(), requestTimeoutDuration))); if (appenderBufferSize.getSize() > grpcMessageSizeMax.getSize()) { throw new IllegalArgumentException("Illegal configuration: " - + RaftServerConfigKeys.Log.Appender.BUFFER_CAPACITY_KEY + " = " + appenderBufferSize + + RaftServerConfigKeys.Log.Appender.BUFFER_BYTE_LIMIT_KEY + " = " + appenderBufferSize + " > " + GrpcConfigKeys.MESSAGE_SIZE_MAX_KEY + " = " + grpcMessageSizeMax); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bef9a72e/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestLogAppenderWithHadoopRpc.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestLogAppenderWithHadoopRpc.java b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestLogAppenderWithHadoopRpc.java new file mode 100644 index 0000000..48489ad --- /dev/null +++ b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestLogAppenderWithHadoopRpc.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.hadooprpc; + +import org.apache.ratis.LogAppenderTests; + +public class TestLogAppenderWithHadoopRpc + extends LogAppenderTests<MiniRaftClusterWithHadoopRpc> + implements MiniRaftClusterWithHadoopRpc.Factory.Get { +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bef9a72e/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java index 25d4b0c..32f5752 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java @@ -227,23 +227,25 @@ public interface RaftServerConfigKeys { interface Appender { String PREFIX = Log.PREFIX + ".appender"; - String BUFFER_CAPACITY_KEY = PREFIX + ".buffer.capacity"; - SizeInBytes BUFFER_CAPACITY_DEFAULT =SizeInBytes.valueOf("4MB"); - static SizeInBytes bufferCapacity(RaftProperties properties) { - return getSizeInBytes(properties::getSizeInBytes, - BUFFER_CAPACITY_KEY, BUFFER_CAPACITY_DEFAULT, getDefaultLog()); + String BUFFER_ELEMENT_LIMIT_KEY = PREFIX + ".buffer.element-limit"; + /** 0 means no limit. */ + int BUFFER_ELEMENT_LIMIT_DEFAULT = 0; + static int bufferElementLimit(RaftProperties properties) { + return getInt(properties::getInt, + BUFFER_ELEMENT_LIMIT_KEY, BUFFER_ELEMENT_LIMIT_DEFAULT, getDefaultLog(), requireMin(0)); } - static void setBufferCapacity(RaftProperties properties, SizeInBytes bufferCapacity) { - setSizeInBytes(properties::set, BUFFER_CAPACITY_KEY, bufferCapacity); + static void setBufferElementLimit(RaftProperties properties, int bufferElementLimit) { + setInt(properties::setInt, BUFFER_ELEMENT_LIMIT_KEY, bufferElementLimit); } - String BATCH_ENABLED_KEY = PREFIX + ".batch.enabled"; - boolean BATCH_ENABLED_DEFAULT = false; - static boolean batchEnabled(RaftProperties properties) { - return getBoolean(properties::getBoolean, BATCH_ENABLED_KEY, BATCH_ENABLED_DEFAULT, getDefaultLog()); + String BUFFER_BYTE_LIMIT_KEY = PREFIX + ".buffer.byte-limit"; + SizeInBytes BUFFER_BYTE_LIMIT_DEFAULT = SizeInBytes.valueOf("4MB"); + static SizeInBytes bufferByteLimit(RaftProperties properties) { + return getSizeInBytes(properties::getSizeInBytes, + BUFFER_BYTE_LIMIT_KEY, BUFFER_BYTE_LIMIT_DEFAULT, getDefaultLog()); } - static void setBatchEnabled(RaftProperties properties, boolean batchEnabled) { - setBoolean(properties::setBoolean, BATCH_ENABLED_KEY, batchEnabled); + static void setBufferByteLimit(RaftProperties properties, SizeInBytes bufferByteLimit) { + setSizeInBytes(properties::set, BUFFER_BYTE_LIMIT_KEY, bufferByteLimit); } String SNAPSHOT_CHUNK_SIZE_MAX_KEY = PREFIX + ".snapshot.chunk.size.max"; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bef9a72e/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java index 6cb8538..633496e 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -58,9 +58,8 @@ public class LogAppender { private final LeaderState leaderState; protected final RaftLog raftLog; protected final FollowerInfo follower; - private final int maxBufferSize; - private final boolean batchSending; - private final LogEntryBuffer buffer; + + private final DataQueue<EntryWithData> buffer; private final int snapshotChunkMaxSize; protected final long halfMinTimeoutMs; @@ -74,12 +73,12 @@ public class LogAppender { this.raftLog = server.getState().getLog(); final RaftProperties properties = server.getProxy().getProperties(); - this.maxBufferSize = RaftServerConfigKeys.Log.Appender.bufferCapacity(properties).getSizeInt(); - this.batchSending = RaftServerConfigKeys.Log.Appender.batchEnabled(properties); this.snapshotChunkMaxSize = RaftServerConfigKeys.Log.Appender.snapshotChunkSizeMax(properties).getSizeInt(); this.halfMinTimeoutMs = server.getMinTimeoutMs() / 2; - this.buffer = new LogEntryBuffer(); + final SizeInBytes bufferByteLimit = RaftServerConfigKeys.Log.Appender.bufferByteLimit(properties); + final int bufferElementLimit = RaftServerConfigKeys.Log.Appender.bufferElementLimit(properties); + this.buffer = new DataQueue<>(this, bufferByteLimit, bufferElementLimit, EntryWithData::getSerializedSize); this.lifeCycle = new LifeCycle(this); } @@ -133,51 +132,6 @@ public class LogAppender { return getFollower().getPeer().getId(); } - /** - * A buffer for log entries with size limitation. - */ - private class LogEntryBuffer { - private final List<EntryWithData> buf = new ArrayList<>(); - private int totalSize = 0; - - /** - * Adds a log entry to the Log entry buffer. - * Checks if enough space is available before adding the entry to the buffer. - * @return true if the entry is added successfully; - * otherwise, the entry is not added, return false. - */ - boolean addEntry(EntryWithData entry) { - final int entrySize = entry.getSerializedSize(); - if (totalSize + entrySize <= maxBufferSize) { - buf.add(entry); - totalSize += entrySize; - return true; - } - return false; - } - - boolean isEmpty() { - return buf.isEmpty(); - } - - AppendEntriesRequestProto getAppendRequest(TermIndex previous, long callId) throws RaftLogIOException { - final List<LogEntryProto> protos = new ArrayList<>(); - // Wait for all the log entry futures to complete and then create a list of LogEntryProtos. - for (EntryWithData bufEntry : buf) { - protos.add(bufEntry.getEntry()); - } - final AppendEntriesRequestProto request = leaderState.newAppendEntriesRequestProto( - getFollowerId(), previous, protos, !follower.isAttendingVote(), callId); - buf.clear(); - totalSize = 0; - return request; - } - - int getPendingEntryNum() { - return buf.size(); - } - } - private TermIndex getPrevious() { TermIndex previous = raftLog.getTermIndex(follower.getNextIndex() - 1); if (previous == null) { @@ -194,28 +148,29 @@ public class LogAppender { protected AppendEntriesRequestProto createRequest(long callId) throws RaftLogIOException { final TermIndex previous = getPrevious(); + final long heartbeatRemainingMs = getHeartbeatRemainingTime(); + if (heartbeatRemainingMs <= 0L) { + return leaderState.newAppendEntriesRequestProto( + getFollowerId(), previous, Collections.emptyList(), !follower.isAttendingVote(), callId); + } + + Preconditions.assertTrue(buffer.isEmpty(), () -> "buffer has " + buffer.getNumElements() + " elements."); + final long leaderNext = raftLog.getNextIndex(); - long next = follower.getNextIndex() + buffer.getPendingEntryNum(); - final boolean toSend; - - if (leaderNext == next && !buffer.isEmpty()) { - // no new entries, then send out the entries in the buffer - toSend = true; - } else if (leaderNext > next) { - boolean hasSpace = true; - for(; hasSpace && leaderNext > next;) { - hasSpace = buffer.addEntry(raftLog.getEntryWithData(next++)); + for (long next = follower.getNextIndex(); leaderNext > next; ) { + if (!buffer.offer(raftLog.getEntryWithData(next++))) { + break; } - // buffer is full or batch sending is disabled, send out a request - toSend = !hasSpace || !batchSending; - } else { - toSend = false; } - - if (toSend || shouldHeartbeat()) { - return buffer.getAppendRequest(previous, callId); + if (buffer.isEmpty()) { + return null; } - return null; + + final List<LogEntryProto> protos = buffer.pollList(heartbeatRemainingMs, EntryWithData::getEntry, + (entry, time, exception) -> LOG.warn(this + ": Failed get " + entry + " in " + time, exception)); + buffer.clear(); + return leaderState.newAppendEntriesRequestProto( + getFollowerId(), previous, protos, !follower.isAttendingVote(), callId); } /** Send an appendEntries RPC; retry indefinitely. */ @@ -442,8 +397,7 @@ public class LogAppender { } } } - if (isAppenderRunning() && !shouldAppendEntries( - follower.getNextIndex() + buffer.getPendingEntryNum())) { + if (isAppenderRunning() && !shouldAppendEntries(follower.getNextIndex())) { final long waitTime = getHeartbeatRemainingTime(); if (waitTime > 0) { synchronized (this) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bef9a72e/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java index ee5218d..7dfc331 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java @@ -182,7 +182,7 @@ public class ServerState implements Closeable { final RaftLog log; if (RaftServerConfigKeys.Log.useMemory(prop)) { final int maxBufferSize = - RaftServerConfigKeys.Log.Appender.bufferCapacity(prop).getSizeInt(); + RaftServerConfigKeys.Log.Appender.bufferByteLimit(prop).getSizeInt(); log = new MemoryRaftLog(id, lastIndexInSnapshot, maxBufferSize); } else { log = new SegmentedRaftLog(id, server, this.storage, http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bef9a72e/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java index 8478617..155122e 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -31,6 +31,7 @@ import org.apache.ratis.util.AutoCloseableLock; import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.OpenCloseState; import org.apache.ratis.util.Preconditions; +import org.apache.ratis.util.TimeDuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,6 +40,7 @@ import java.io.IOException; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Consumer; @@ -412,18 +414,25 @@ public abstract class RaftLog implements RaftLogSequentialOps, Closeable { this.future = future; } + public long getIndex() { + return logEntry.getIndex(); + } + public int getSerializedSize() { return ServerProtoUtils.getSerializedSize(logEntry); } - public LogEntryProto getEntry() throws RaftLogIOException { + public LogEntryProto getEntry(TimeDuration timeout) throws RaftLogIOException, TimeoutException { LogEntryProto entryProto; if (future == null) { return logEntry; } try { - entryProto = future.thenApply(data -> ServerProtoUtils.addStateMachineData(data, logEntry)).join(); + entryProto = future.thenApply(data -> ServerProtoUtils.addStateMachineData(data, logEntry)) + .get(timeout.getDuration(), timeout.getUnit()); + } catch (TimeoutException t) { + throw t; } catch (Throwable t) { final String err = selfId + ": Failed readStateMachineData for " + ServerProtoUtils.toLogEntryString(logEntry); @@ -440,5 +449,10 @@ public abstract class RaftLog implements RaftLogSequentialOps, Closeable { } return entryProto; } + + @Override + public String toString() { + return ServerProtoUtils.toLogEntryString(logEntry); + } } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bef9a72e/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java index d23e0a5..f5a7330 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java @@ -111,7 +111,7 @@ public class SegmentedRaftLog extends RaftLog { SegmentedRaftLog(RaftPeerId selfId, RaftServerImpl server, StateMachine stateMachine, Runnable submitUpdateCommitEvent, RaftStorage storage, long lastIndexInSnapshot, RaftProperties properties) { - super(selfId, lastIndexInSnapshot, RaftServerConfigKeys.Log.Appender.bufferCapacity(properties).getSizeInt()); + super(selfId, lastIndexInSnapshot, RaftServerConfigKeys.Log.Appender.bufferByteLimit(properties).getSizeInt()); this.server = Optional.ofNullable(server); this.storage = storage; segmentMaxSize = RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize(); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bef9a72e/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java b/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java new file mode 100644 index 0000000..c8ddc0d --- /dev/null +++ b/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis; + +import org.apache.log4j.Level; +import org.apache.ratis.RaftTestUtil.SimpleMessage; +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.proto.RaftProtos.LogEntryProto; +import org.apache.ratis.proto.RaftProtos.LogEntryProto.LogEntryBodyCase; +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.server.RaftServerConfigKeys; +import org.apache.ratis.server.impl.LogAppender; +import org.apache.ratis.server.impl.ServerProtoUtils; +import org.apache.ratis.server.impl.ServerState; +import org.apache.ratis.server.storage.RaftLog; +import org.apache.ratis.statemachine.SimpleStateMachine4Testing; +import org.apache.ratis.statemachine.StateMachine; +import org.apache.ratis.util.LogUtils; +import org.apache.ratis.util.SizeInBytes; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.EnumMap; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public abstract class LogAppenderTests<CLUSTER extends MiniRaftCluster> + extends BaseTest + implements MiniRaftCluster.Factory.Get<CLUSTER> { + { + LogUtils.setLogLevel(LogAppender.LOG, Level.DEBUG); + } + + { + final RaftProperties prop = getProperties(); + prop.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, SimpleStateMachine4Testing.class, StateMachine.class); + + final SizeInBytes n = SizeInBytes.valueOf("8KB"); + RaftServerConfigKeys.Log.setSegmentSizeMax(prop, n); + RaftServerConfigKeys.Log.Appender.setBufferByteLimit(prop, n); + } + + static SimpleMessage[] generateMsgs(int num) { + SimpleMessage[] msgs = new SimpleMessage[num * 6]; + for (int i = 0; i < num; i++) { + for (int j = 0; j < 6; j++) { + byte[] bytes = new byte[1024 * (j + 1)]; + Arrays.fill(bytes, (byte) (j + '0')); + msgs[i * 6 + j] = new SimpleMessage(new String(bytes)); + } + } + return msgs; + } + + private static class Sender extends Thread { + private final RaftClient client; + private final CountDownLatch latch; + private final SimpleMessage[] messages; + private final AtomicBoolean succeed = new AtomicBoolean(false); + private final AtomicReference<Exception> exception = new AtomicReference<>(); + + Sender(RaftClient client, int numMessages, CountDownLatch latch) { + this.latch = latch; + this.client = client; + this.messages = generateMsgs(numMessages); + } + + @Override + public void run() { + try { + latch.await(); + for (SimpleMessage msg : messages) { + client.send(msg); + } + client.close(); + succeed.set(true); + } catch (Exception e) { + exception.compareAndSet(null, e); + } + } + } + + @Test + public void testSingleElementBuffer() throws Exception { + RaftServerConfigKeys.Log.Appender.setBufferElementLimit(getProperties(), 1); + runWithNewCluster(3, this::runTest); + } + + @Test + public void testUnlimitedElementBuffer() throws Exception { + RaftServerConfigKeys.Log.Appender.setBufferElementLimit(getProperties(), 0); + runWithNewCluster(3, this::runTest); + } + + void runTest(CLUSTER cluster) throws Exception { + final int numMsgs = 10; + final int numClients = 5; + final RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId(); + + // start several clients and write concurrently + final CountDownLatch latch = new CountDownLatch(1); + final List<Sender> senders = Stream.iterate(0, i -> i+1).limit(numClients) + .map(i -> new Sender(cluster.createClient(leaderId), numMsgs, latch)) + .collect(Collectors.toList()); + senders.forEach(Thread::start); + + latch.countDown(); + + for (Sender s : senders) { + s.join(); + final Exception e = s.exception.get(); + if (e != null) { + throw e; + } + Assert.assertTrue(s.succeed.get()); + } + + final ServerState leaderState = cluster.getLeader().getState(); + final RaftLog leaderLog = leaderState.getLog(); + final EnumMap<LogEntryBodyCase, AtomicLong> counts = RaftTestUtil.countEntries(leaderLog); + LOG.info("counts = " + counts); + Assert.assertEquals(6 * numMsgs * numClients, counts.get(LogEntryBodyCase.STATEMACHINELOGENTRY).get()); + + final LogEntryProto last = RaftTestUtil.getLastEntry(LogEntryBodyCase.STATEMACHINELOGENTRY, leaderLog); + LOG.info("last = " + ServerProtoUtils.toLogEntryString(last)); + Assert.assertNotNull(last); + Assert.assertTrue(last.getIndex() <= leaderState.getLastAppliedIndex()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bef9a72e/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java index 90cc627..a21796f 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java @@ -61,7 +61,6 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster> { LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); LogUtils.setLogLevel(RaftServerTestUtil.getStateMachineUpdaterLog(), Level.DEBUG); - LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG); RaftServerConfigKeys.RetryCache.setExpiryTime(getProperties(), TimeDuration.valueOf(5, TimeUnit.SECONDS)); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bef9a72e/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java index 58e26b2..bf43ba6 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java @@ -55,7 +55,7 @@ public abstract class RaftExceptionBaseTest<CLUSTER extends MiniRaftCluster> public void setup() throws IOException { final RaftProperties prop = getProperties(); RaftServerConfigKeys.Log.Appender - .setBufferCapacity(prop, SizeInBytes.valueOf("4KB")); + .setBufferByteLimit(prop, SizeInBytes.valueOf("4KB")); cluster = newCluster(NUM_PEERS); cluster.start(); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bef9a72e/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java index d4c4021..6306ce2 100644 --- a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java +++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java @@ -154,7 +154,9 @@ public class SimpleStateMachine4Testing extends BaseStateMachine { Preconditions.assertNull(previous, "previous"); final String s = entry.getStateMachineLogEntry().getLogData().toStringUtf8(); dataMap.put(s, entry); - LOG.info("{}: put {}, {} -> {}", getId(), entry.getIndex(), s, ServerProtoUtils.toLogEntryString(entry)); + LOG.info("{}: put {}, {} -> {}", getId(), entry.getIndex(), + s.length() <= 10? s: s.substring(0, 10) + "...", + ServerProtoUtils.toLogEntryString(entry)); } @Override http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bef9a72e/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java ---------------------------------------------------------------------- diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java new file mode 100644 index 0000000..5918efd --- /dev/null +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc; + +import org.apache.ratis.LogAppenderTests; + +public class TestLogAppenderWithGrpc + extends LogAppenderTests<MiniRaftClusterWithGrpc> + implements MiniRaftClusterWithGrpc.FactoryGet { +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bef9a72e/ratis-test/src/test/java/org/apache/ratis/netty/TestLogAppenderWithNetty.java ---------------------------------------------------------------------- diff --git a/ratis-test/src/test/java/org/apache/ratis/netty/TestLogAppenderWithNetty.java b/ratis-test/src/test/java/org/apache/ratis/netty/TestLogAppenderWithNetty.java new file mode 100644 index 0000000..85427a7 --- /dev/null +++ b/ratis-test/src/test/java/org/apache/ratis/netty/TestLogAppenderWithNetty.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.netty; + +import org.apache.ratis.LogAppenderTests; + +public class TestLogAppenderWithNetty + extends LogAppenderTests<MiniRaftClusterWithNetty> + implements MiniRaftClusterWithNetty.FactoryGet { +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bef9a72e/ratis-test/src/test/java/org/apache/ratis/server/simulation/TestLogAppenderWithSimulatedRpc.java ---------------------------------------------------------------------- diff --git a/ratis-test/src/test/java/org/apache/ratis/server/simulation/TestLogAppenderWithSimulatedRpc.java b/ratis-test/src/test/java/org/apache/ratis/server/simulation/TestLogAppenderWithSimulatedRpc.java new file mode 100644 index 0000000..a23ce1d --- /dev/null +++ b/ratis-test/src/test/java/org/apache/ratis/server/simulation/TestLogAppenderWithSimulatedRpc.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.simulation; + +import org.apache.ratis.LogAppenderTests; + +public class TestLogAppenderWithSimulatedRpc + extends LogAppenderTests<MiniRaftClusterWithSimulatedRpc> + implements MiniRaftClusterWithSimulatedRpc.FactoryGet { +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bef9a72e/ratis-test/src/test/java/org/apache/ratis/util/TestDataQueue.java ---------------------------------------------------------------------- diff --git a/ratis-test/src/test/java/org/apache/ratis/util/TestDataQueue.java b/ratis-test/src/test/java/org/apache/ratis/util/TestDataQueue.java new file mode 100644 index 0000000..e465a1d --- /dev/null +++ b/ratis-test/src/test/java/org/apache/ratis/util/TestDataQueue.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.util; + +import org.apache.ratis.util.function.TriConsumer; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; +import java.util.concurrent.TimeoutException; + +public class TestDataQueue { + static <T> TriConsumer<T, TimeDuration, TimeoutException> getTimeoutHandler(boolean expctedTimeout) { + return (element, time, exception) -> { + if (!expctedTimeout) { + throw new AssertionError("Unexpected timeout to get element " + element + " in " + time, exception); + } + }; + } + + private void assertSizes(int expectedNumElements, int expectedNumBytes) { + Assert.assertEquals(expectedNumElements, q.getNumElements()); + Assert.assertEquals(expectedNumBytes, q.getNumBytes()); + } + + final SizeInBytes byteLimit = SizeInBytes.valueOf(100); + final int elementLimit = 5; + final DataQueue<Integer> q = new DataQueue<>(null, byteLimit, elementLimit, Integer::intValue); + + @Test(timeout = 1000) + public void testElementLimit() { + assertSizes(0, 0); + + int numBytes = 0; + for (int i = 0; i < elementLimit; i++) { + Assert.assertEquals(i, q.getNumElements()); + Assert.assertEquals(numBytes, q.getNumBytes()); + final boolean offered = q.offer(i); + Assert.assertTrue(offered); + numBytes += i; + assertSizes(i+1, numBytes); + } + { + final boolean offered = q.offer(0); + Assert.assertFalse(offered); + assertSizes(elementLimit, numBytes); + } + + { // poll all elements + final List<Integer> polled = q.pollList(100, (i, timeout) -> i, getTimeoutHandler(false)); + Assert.assertEquals(elementLimit, polled.size()); + for (int i = 0; i < polled.size(); i++) { + Assert.assertEquals(i, polled.get(i).intValue()); + } + } + assertSizes(0, 0); + } + + @Test(timeout = 1000) + public void testByteLimit() { + assertSizes(0, 0); + + try { + q.offer(byteLimit.getSizeInt() + 1); + Assert.fail(); + } catch (IllegalStateException ignored) { + } + + final int halfBytes = byteLimit.getSizeInt() / 2; + { + final boolean offered = q.offer(halfBytes); + Assert.assertTrue(offered); + assertSizes(1, halfBytes); + } + + { + final boolean offered = q.offer(halfBytes + 1); + Assert.assertFalse(offered); + assertSizes(1, halfBytes); + } + + { + final boolean offered = q.offer(halfBytes); + Assert.assertTrue(offered); + assertSizes(2, byteLimit.getSizeInt()); + } + + { + final boolean offered = q.offer(1); + Assert.assertFalse(offered); + assertSizes(2, byteLimit.getSizeInt()); + } + + { + final boolean offered = q.offer(0); + Assert.assertTrue(offered); + assertSizes(3, byteLimit.getSizeInt()); + } + + { // poll all elements + final List<Integer> polled = q.pollList(100, (i, timeout) -> i, getTimeoutHandler(false)); + Assert.assertEquals(3, polled.size()); + Assert.assertEquals(halfBytes, polled.get(0).intValue()); + Assert.assertEquals(halfBytes, polled.get(1).intValue()); + Assert.assertEquals(0, polled.get(2).intValue()); + } + + assertSizes(0, 0); + } + + @Test(timeout = 1000) + public void testTimeout() { + assertSizes(0, 0); + + int numBytes = 0; + for (int i = 0; i < elementLimit; i++) { + Assert.assertEquals(i, q.getNumElements()); + Assert.assertEquals(numBytes, q.getNumBytes()); + final boolean offered = q.offer(i); + Assert.assertTrue(offered); + numBytes += i; + assertSizes(i+1, numBytes); + } + + { // poll with zero time + final List<Integer> polled = q.pollList(0, (i, timeout) -> i, getTimeoutHandler(false)); + Assert.assertTrue(polled.isEmpty()); + assertSizes(elementLimit, numBytes); + } + + final int halfElements = elementLimit / 2; + { // poll with timeout + final List<Integer> polled = q.pollList(100, (i, timeout) -> { + if (i == halfElements) { + // simulate timeout + throw new TimeoutException("i=" + i); + } + return i; + }, getTimeoutHandler(true)); + Assert.assertEquals(halfElements, polled.size()); + for (int i = 0; i < polled.size(); i++) { + Assert.assertEquals(i, polled.get(i).intValue()); + numBytes -= i; + } + assertSizes(elementLimit - halfElements, numBytes); + } + + { // poll the remaining elements + final List<Integer> polled = q.pollList(100, (i, timeout) -> i, getTimeoutHandler(false)); + Assert.assertEquals(elementLimit - halfElements, polled.size()); + for (int i = 0; i < polled.size(); i++) { + Assert.assertEquals(halfElements + i, polled.get(i).intValue()); + } + } + assertSizes(0, 0); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bef9a72e/ratis-test/src/test/java/org/apache/ratis/util/TestTimeDuration.java ---------------------------------------------------------------------- diff --git a/ratis-test/src/test/java/org/apache/ratis/util/TestTimeDuration.java b/ratis-test/src/test/java/org/apache/ratis/util/TestTimeDuration.java index 782d80d..feb7b6c 100644 --- a/ratis-test/src/test/java/org/apache/ratis/util/TestTimeDuration.java +++ b/ratis-test/src/test/java/org/apache/ratis/util/TestTimeDuration.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -19,7 +19,6 @@ package org.apache.ratis.util; import org.junit.Test; -import java.sql.Time; import java.util.Arrays; import java.util.List; import java.util.concurrent.TimeUnit; @@ -32,12 +31,12 @@ import static org.junit.Assert.assertNotNull; public class TestTimeDuration { @Test(timeout = 1000) - public void testTimeDuration() { + public void testAbbreviation() { Arrays.asList(TimeUnit.values()) .forEach(a -> assertNotNull(Abbreviation.valueOf(a.name()))); assertEquals(TimeUnit.values().length, Abbreviation.values().length); - final List<String> allSymbols = Arrays.asList(Abbreviation.values()).stream() + final List<String> allSymbols = Arrays.stream(Abbreviation.values()) .map(Abbreviation::getSymbols) .flatMap(List::stream) .collect(Collectors.toList()); @@ -45,7 +44,10 @@ public class TestTimeDuration { allSymbols.stream() .map(s -> "0" + s) .forEach(s -> assertEquals(s, 0L, parse(s, unit)))); + } + @Test(timeout = 1000) + public void testParse() { assertEquals(1L, parse("1000000 ns", TimeUnit.MILLISECONDS)); assertEquals(10L, parse("10000000 nanos", TimeUnit.MILLISECONDS)); assertEquals(100L, parse("100000000 nanosecond", TimeUnit.MILLISECONDS)); @@ -97,4 +99,59 @@ public class TestTimeDuration { assertEquals(nanosPerSecond, oneSecond.roundUp(nanosPerSecond)); assertEquals(2*nanosPerSecond, oneSecond.roundUp(nanosPerSecond + 1)); } + + @Test(timeout = 1000) + public void testTo() { + final TimeDuration oneSecond = TimeDuration.valueOf(1, TimeUnit.SECONDS); + assertTo(1000, oneSecond, TimeUnit.MILLISECONDS); + final TimeDuration nanos = assertTo(1_000_000_000, oneSecond, TimeUnit.NANOSECONDS); + assertTo(1000, nanos, TimeUnit.MILLISECONDS); + + assertTo(0, oneSecond, TimeUnit.MINUTES); + assertTo(0, nanos, TimeUnit.MINUTES); + + final TimeDuration millis = TimeDuration.valueOf(1_999, TimeUnit.MILLISECONDS); + assertTo(1, millis, TimeUnit.SECONDS); + assertTo(0, millis, TimeUnit.MINUTES); + } + + static TimeDuration assertTo(long expected, TimeDuration timeDuration, TimeUnit toUnit) { + final TimeDuration computed = timeDuration.to(toUnit); + assertEquals(expected, computed.getDuration()); + assertEquals(toUnit, computed.getUnit()); + return computed; + } + + @Test(timeout = 1000) + public void testMinus() { + final TimeDuration oneSecond = TimeDuration.valueOf(1, TimeUnit.SECONDS); + final TimeDuration tenSecond = TimeDuration.valueOf(10, TimeUnit.SECONDS); + { + final TimeDuration d = oneSecond.minus(oneSecond); + assertEquals(0, d.getDuration()); + assertEquals(TimeUnit.SECONDS, d.getUnit()); + } + { + final TimeDuration d = tenSecond.minus(oneSecond); + assertEquals(9, d.getDuration()); + assertEquals(TimeUnit.SECONDS, d.getUnit()); + } + { + final TimeDuration d = oneSecond.minus(tenSecond); + assertEquals(-9, d.getDuration()); + assertEquals(TimeUnit.SECONDS, d.getUnit()); + } + + final TimeDuration oneMS = TimeDuration.valueOf(1, TimeUnit.MILLISECONDS); + { + final TimeDuration d = oneSecond.minus(oneMS); + assertEquals(999, d.getDuration()); + assertEquals(TimeUnit.MILLISECONDS, d.getUnit()); + } + { + final TimeDuration d = oneMS.minus(oneSecond); + assertEquals(-999, d.getDuration()); + assertEquals(TimeUnit.MILLISECONDS, d.getUnit()); + } + } }
