Repository: incubator-ratis Updated Branches: refs/heads/master 6adf89d48 -> b5a07be08
RATIS-34. Use ConfUtils to get and set RaftServerConfigKeys. Contributed by Tsz Wo Nicholas Sze. Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/b5a07be0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/b5a07be0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/b5a07be0 Branch: refs/heads/master Commit: b5a07be08446f6c7148329b6b44a69199d62a49f Parents: 6adf89d Author: Jing Zhao <[email protected]> Authored: Mon Mar 6 15:42:17 2017 -0800 Committer: Jing Zhao <[email protected]> Committed: Mon Mar 6 15:42:17 2017 -0800 ---------------------------------------------------------------------- .../java/org/apache/ratis/conf/ConfUtils.java | 181 +++++++++++++--- .../java/org/apache/ratis/TestBatchAppend.java | 11 +- .../org/apache/ratis/TestRestartRaftPeer.java | 4 +- .../ratis/hadooprpc/HadoopConfigKeys.java | 4 +- ratis-netty/pom.xml | 1 - .../org/apache/ratis/netty/NettyConfigKeys.java | 14 +- .../ratis/server/RaftServerConfigKeys.java | 214 ++++++++++++++----- .../apache/ratis/server/impl/LeaderState.java | 60 ++---- .../apache/ratis/server/impl/LogAppender.java | 62 +++--- .../ratis/server/impl/RaftServerImpl.java | 8 +- .../apache/ratis/server/impl/ServerState.java | 22 +- .../ratis/server/impl/StateMachineUpdater.java | 12 +- .../ratis/server/storage/LogOutputStream.java | 34 ++- .../ratis/server/storage/RaftLogWorker.java | 7 +- .../ratis/server/storage/RaftStorage.java | 8 +- .../ratis/server/storage/SegmentedRaftLog.java | 20 +- .../java/org/apache/ratis/MiniRaftCluster.java | 10 +- .../java/org/apache/ratis/RaftTestUtil.java | 2 +- .../impl/RaftReconfigurationBaseTest.java | 6 +- .../MiniRaftClusterWithSimulatedRpc.java | 4 +- .../simulation/SimulatedRequestReply.java | 2 +- .../server/storage/TestRaftLogReadWrite.java | 32 ++- .../server/storage/TestRaftLogSegment.java | 26 +-- .../ratis/server/storage/TestRaftStorage.java | 2 +- .../server/storage/TestSegmentedRaftLog.java | 27 ++- .../statemachine/RaftSnapshotBaseTest.java | 19 +- .../ratis/statemachine/TestStateMachine.java | 37 ++-- 27 files changed, 494 insertions(+), 335 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/b5a07be0/ratis-common/src/main/java/org/apache/ratis/conf/ConfUtils.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/conf/ConfUtils.java b/ratis-common/src/main/java/org/apache/ratis/conf/ConfUtils.java index 8593aa4..34cdbc3 100644 --- a/ratis-common/src/main/java/org/apache/ratis/conf/ConfUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/conf/ConfUtils.java @@ -21,50 +21,173 @@ import org.apache.ratis.util.NetUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; import java.net.InetSocketAddress; +import java.util.Arrays; import java.util.function.BiConsumer; import java.util.function.BiFunction; +import java.util.function.Consumer; -public abstract class ConfUtils { - static Logger LOG = LoggerFactory.getLogger(ConfUtils.class); +public interface ConfUtils { + Logger LOG = LoggerFactory.getLogger(ConfUtils.class); - public static int getInt( - BiFunction<String, Integer, Integer> getInt, - String key, int defaultValue, Integer min, Integer max) { - final int value = getInt.apply(key, defaultValue); - final String s = key + " = " + value; - LOG.info(s); + static void logGet(String key, Object value) { + LOG.info("{} = {}", key, value); + } - if (min != null && value < min) { - throw new IllegalArgumentException(s + " < min = " + min); - } - if (max != null && value > max) { - throw new IllegalArgumentException(s + " > max = " + max); - } - return value; + static void logSet(String key, Object value) { + LOG.debug("set {} = {}", key, value); + } + + static BiConsumer<String, Integer> requireMin(int min) { + return (key, value) -> { + if (value < min) { + throw new IllegalArgumentException( + key + " = " + value + " < min = " + min); + } + }; + } + + static BiConsumer<String, Integer> requireMax(int max) { + return (key, value) -> { + if (value > max) { + throw new IllegalArgumentException( + key + " = " + value + " > max = " + max); + } + }; + } + + static BiConsumer<String, Long> requireMin(long min) { + return (key, value) -> { + if (value < min) { + throw new IllegalArgumentException( + key + " = " + value + " < min = " + min); + } + }; + } + + static BiConsumer<String, Long> requireMax(long max) { + return (key, value) -> { + if (value > max) { + throw new IllegalArgumentException( + key + " = " + value + " > max = " + max); + } + }; + } + + static boolean getBoolean( + BiFunction<String, Boolean, Boolean> booleanGetter, + String key, boolean defaultValue, BiConsumer<String, Boolean>... assertions) { + return get(booleanGetter, key, defaultValue, assertions); + } + + static int getInt( + BiFunction<String, Integer, Integer> integerGetter, + String key, int defaultValue, BiConsumer<String, Integer>... assertions) { + return get(integerGetter, key, defaultValue, assertions); + } + + static long getLong( + BiFunction<String, Long, Long> longGetter, + String key, long defaultValue, BiConsumer<String, Long>... assertions) { + return get(longGetter, key, defaultValue, assertions); } - public static <T> T get(BiFunction<String, T, T> getString, - String key, T defaultValue) { - final T value = getString.apply(key, defaultValue); - LOG.info(key + " = " + value); + static <T> T get(BiFunction<String, T, T> getter, + String key, T defaultValue, BiConsumer<String, T>... assertions) { + final T value = getter.apply(key, defaultValue); + logGet(key, value); + Arrays.asList(assertions).forEach(a -> a.accept(key, value)); return value; } - public static InetSocketAddress getInetSocketAddress( - BiFunction<String, String, String> getString, + static InetSocketAddress getInetSocketAddress( + BiFunction<String, String, String> stringGetter, String key, String defaultValue) { - return NetUtils.createSocketAddr(get(getString, key, defaultValue)); + return NetUtils.createSocketAddr(get(stringGetter, key, defaultValue)); + } + + static void setBoolean( + BiConsumer<String, Boolean> booleanSetter, String key, boolean value, + BiConsumer<String, Boolean>... assertions) { + set(booleanSetter, key, value, assertions); + } + + static void setInt( + BiConsumer<String, Integer> integerSetter, String key, int value, + BiConsumer<String, Integer>... assertions) { + set(integerSetter, key, value, assertions); + } + + static void setLong( + BiConsumer<String, Long> longSetter, String key, long value, + BiConsumer<String, Long>... assertions) { + set(longSetter, key, value, assertions); + } + + static <T> void set( + BiConsumer<String, T> setter, String key, T value, + BiConsumer<String, T>... assertions) { + Arrays.asList(assertions).forEach(a -> a.accept(key, value)); + setter.accept(key, value); + logSet(key, value); } - public static void setInt(BiConsumer<String, Integer> setInt, - String key, int value) { - setInt.accept(key, value); - LOG.info("set " + key + " = " + value); + static void printAll(Class<?> confClass) { + ConfUtils.printAll(confClass, System.out::println); } - public static <T> void set(BiConsumer<String, T> set, String key, T value) { - set.accept(key, value); - LOG.info("set " + key + " = " + value); + static void printAll(Class<?> confClass, Consumer<Object> out) { + out.accept(""); + out.accept("******* " + confClass + " *******"); + Arrays.asList(confClass.getDeclaredFields()) + .forEach(f -> printField(confClass, out, f)); + Arrays.asList(confClass.getClasses()) + .forEach(c -> printAll(c, s -> out.accept(" " + s))); + } + + static void printField(Class<?> confClass, Consumer<Object> out, Field f) { + if (!Modifier.isStatic(f.getModifiers())) { + out.accept("WARNING: Found non-static field " + f); + return; + } + final String fieldName = f.getName(); + if (!fieldName.endsWith("_KEY")) { + if (!fieldName.endsWith("_DEFAULT")) { + try { + out.accept("constant: " + fieldName + " = " + f.get(null)); + } catch (IllegalAccessException e) { + out.accept(fieldName + " is not public"); + } + } + return; + } + + final StringBuilder b = new StringBuilder(); + try { + final Object keyName = f.get(null); + b.append("key: ").append(keyName); + } catch (IllegalAccessException e) { + out.accept("WARNING: Failed to access key " + f); + b.append(fieldName + " is not public"); + } + final String defaultFieldName = fieldName.substring(0, fieldName.length() - 4) + "_DEFAULT"; + b.append(" ("); + try { + final Field defaultField = confClass.getDeclaredField(defaultFieldName); + b.append(defaultField.getType().getSimpleName()).append(", "); + + final Object defaultValue = defaultField.get(null); + b.append("default=").append(defaultValue); + } catch (NoSuchFieldException e) { + out.accept("WARNING: Default value not found for field " + f); + b.append("default not found"); + } catch (IllegalAccessException e) { + out.accept("WARNING: Failed to access default value " + f); + b.append(defaultFieldName).append(" is not public"); + } + b.append(")"); + out.accept(b); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/b5a07be0/ratis-examples/src/test/java/org/apache/ratis/TestBatchAppend.java ---------------------------------------------------------------------- diff --git a/ratis-examples/src/test/java/org/apache/ratis/TestBatchAppend.java b/ratis-examples/src/test/java/org/apache/ratis/TestBatchAppend.java index 71d21eb..c9ee721 100644 --- a/ratis-examples/src/test/java/org/apache/ratis/TestBatchAppend.java +++ b/ratis-examples/src/test/java/org/apache/ratis/TestBatchAppend.java @@ -23,6 +23,7 @@ import org.apache.ratis.client.RaftClient; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.examples.RaftExamplesTestUtil; import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.impl.RaftServerImpl; import org.apache.ratis.server.simulation.RequestHandler; import org.apache.ratis.statemachine.SimpleStateMachine4Testing; @@ -38,8 +39,6 @@ import org.junit.runners.Parameterized; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.ratis.server.RaftServerConfigKeys.*; - import java.io.IOException; import java.util.Arrays; import java.util.Collection; @@ -49,6 +48,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.ratis.server.RaftServerConfigKeys.KB; + /** * Enable raft.server.log.appender.batch.enabled and test LogAppender */ @@ -66,11 +67,11 @@ public class TestBatchAppend { RaftProperties prop = new RaftProperties(); prop.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, SimpleStateMachine4Testing.class, StateMachine.class); - prop.setInt(RAFT_LOG_SEGMENT_MAX_SIZE_KEY, 1024 * 8); + RaftServerConfigKeys.Log.setSegmentSizeMax(prop::setLong, 8*KB); // enable batch appending - prop.setBoolean(RAFT_SERVER_LOG_APPENDER_BATCH_ENABLED_KEY, true); + RaftServerConfigKeys.Log.Appender.setBatchEnabled(prop::setBoolean, true); // set batch appending buffer size to 4KB - prop.setInt(RAFT_SERVER_LOG_APPENDER_BUFFER_CAPACITY_KEY, 4 * 1024); + RaftServerConfigKeys.Log.Appender.setBufferCapacity(prop::setInt, 4*KB); return RaftExamplesTestUtil.getMiniRaftClusters(prop, 3); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/b5a07be0/ratis-examples/src/test/java/org/apache/ratis/TestRestartRaftPeer.java ---------------------------------------------------------------------- diff --git a/ratis-examples/src/test/java/org/apache/ratis/TestRestartRaftPeer.java b/ratis-examples/src/test/java/org/apache/ratis/TestRestartRaftPeer.java index 60f06cd..a5b7d69 100644 --- a/ratis-examples/src/test/java/org/apache/ratis/TestRestartRaftPeer.java +++ b/ratis-examples/src/test/java/org/apache/ratis/TestRestartRaftPeer.java @@ -43,6 +43,8 @@ import java.io.IOException; import java.util.Arrays; import java.util.Collection; +import static org.apache.ratis.server.RaftServerConfigKeys.KB; + /** * Test restarting raft peers. */ @@ -61,7 +63,7 @@ public class TestRestartRaftPeer { RaftProperties prop = new RaftProperties(); prop.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, SimpleStateMachine4Testing.class, StateMachine.class); - prop.setInt(RaftServerConfigKeys.RAFT_LOG_SEGMENT_MAX_SIZE_KEY, 1024 * 8); + RaftServerConfigKeys.Log.setSegmentSizeMax(prop::setLong, 8*KB); return RaftExamplesTestUtil.getMiniRaftClusters(prop, 3); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/b5a07be0/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopConfigKeys.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopConfigKeys.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopConfigKeys.java index cc09510..f65ee5d 100644 --- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopConfigKeys.java +++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopConfigKeys.java @@ -25,6 +25,8 @@ import java.net.InetSocketAddress; import java.util.function.BiConsumer; import java.util.function.BiFunction; +import static org.apache.ratis.conf.ConfUtils.requireMin; + /** Hadoop Rpc specific configuration properties. */ public interface HadoopConfigKeys { String PREFIX = "raft.hadooprpc"; @@ -53,7 +55,7 @@ public interface HadoopConfigKeys { static int handlers(BiFunction<String, Integer, Integer> getInt) { return ConfUtils.getInt(getInt, - HANDLERS_KEY, HANDLERS_DEFAULT, 1, null); + HANDLERS_KEY, HANDLERS_DEFAULT, requireMin(1)); } static InetSocketAddress address(BiFunction<String, String, String> getTrimmed) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/b5a07be0/ratis-netty/pom.xml ---------------------------------------------------------------------- diff --git a/ratis-netty/pom.xml b/ratis-netty/pom.xml index 87dc7ef..fb6566b 100644 --- a/ratis-netty/pom.xml +++ b/ratis-netty/pom.xml @@ -37,7 +37,6 @@ <dependency> <artifactId>ratis-common</artifactId> <groupId>org.apache.ratis</groupId> - <scope>provided</scope> </dependency> <dependency> <artifactId>ratis-common</artifactId> http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/b5a07be0/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java ---------------------------------------------------------------------- diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java index 5cdeb65..fd2703d 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java @@ -22,6 +22,9 @@ import org.apache.ratis.conf.ConfUtils; import java.util.function.BiConsumer; import java.util.function.BiFunction; +import static org.apache.ratis.conf.ConfUtils.requireMax; +import static org.apache.ratis.conf.ConfUtils.requireMin; + public interface NettyConfigKeys { String PREFIX = "raft.netty"; @@ -32,11 +35,16 @@ public interface NettyConfigKeys { int PORT_DEFAULT = 0; static int port(BiFunction<String, Integer, Integer> getInt) { - return ConfUtils.getInt(getInt, PORT_KEY, PORT_DEFAULT, 0, 65536); + return ConfUtils.getInt(getInt, + PORT_KEY, PORT_DEFAULT, requireMin(0), requireMax(65536)); } - static void setPort(BiConsumer<String, Integer> setString, int port) { - ConfUtils.setInt(setString, PORT_KEY, port); + static void setPort(BiConsumer<String, Integer> setInt, int port) { + ConfUtils.setInt(setInt, PORT_KEY, port); } } + + static void main(String[] args) { + ConfUtils.printAll(NettyConfigKeys.class); + } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/b5a07be0/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java index 7721d3c..74c56a5 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java @@ -17,62 +17,180 @@ */ package org.apache.ratis.server; -public interface RaftServerConfigKeys { - - String PREFIX = "raft.server"; - - String RAFT_SERVER_USE_MEMORY_LOG_KEY = "raft.server.use.memory.log"; - boolean RAFT_SERVER_USE_MEMORY_LOG_DEFAULT = false; - - String RAFT_SERVER_STORAGE_DIR_KEY = "raft.server.storage.dir"; - String RAFT_SERVER_STORAGE_DIR_DEFAULT = "file:///tmp/raft-server/"; - - /** whether trigger snapshot when log size exceeds limit */ - String RAFT_SERVER_AUTO_SNAPSHOT_ENABLED_KEY = "raft.server.auto.snapshot.enabled"; - /** by default let the state machine to decide when to do checkpoint */ - boolean RAFT_SERVER_AUTO_SNAPSHOT_ENABLED_DEFAULT = false; - - /** log size limit (in number of log entries) that triggers the snapshot */ - String RAFT_SERVER_SNAPSHOT_TRIGGER_THRESHOLD_KEY = "raft.server.snapshot.trigger.threshold"; - long RAFT_SERVER_SNAPSHOT_TRIGGER_THRESHOLD_DEFAULT = 400000; - - String RAFT_LOG_SEGMENT_MAX_SIZE_KEY = "raft.log.segment.max.size"; - long RAFT_LOG_SEGMENT_MAX_SIZE_DEFAULT = 1024L * 1024 * 8; // 8MB - - String RAFT_LOG_SEGMENT_PREALLOCATED_SIZE_KEY = "raft.log.segment.preallocated.size"; - int RAFT_LOG_SEGMENT_PREALLOCATED_SIZE_DEFAULT = 1024 * 1024 * 4; // 4MB +import org.apache.ratis.conf.ConfUtils; - String RAFT_LOG_WRITE_BUFFER_SIZE_KEY = "raft.log.write.buffer.size"; - int RAFT_LOG_WRITE_BUFFER_SIZE_DEFAULT = 64 * 1024; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; - String RAFT_SNAPSHOT_CHUNK_MAX_SIZE_KEY = "raft.snapshot.chunk.max.size"; - int RAFT_SNAPSHOT_CHUNK_MAX_SIZE_DEFAULT = 1024 * 1024 * 16; +import static org.apache.ratis.conf.ConfUtils.requireMin; - String RAFT_LOG_FORCE_SYNC_NUM_KEY = "raft.log.force.sync.num"; - int RAFT_LOG_FORCE_SYNC_NUM_DEFAULT = 128; - - /** server rpc timeout related */ - String RAFT_SERVER_RPC_TIMEOUT_MIN_MS_KEY = "raft.server.rpc.timeout.min.ms"; - int RAFT_SERVER_RPC_TIMEOUT_MIN_MS_DEFAULT = 150; - - String RAFT_SERVER_RPC_TIMEOUT_MAX_MS_KEY = "raft.server.rpc.timeout.max.ms"; - int RAFT_SERVER_RPC_TIMEOUT_MAX_MS_DEFAULT = 300; - - String RAFT_SERVER_RPC_SLEEP_TIME_MS_KEY = "raft.server.rpc.sleep.time.ms"; - int RAFT_SERVER_RPC_SLEEP_TIME_MS_DEFAULT = 25; +public interface RaftServerConfigKeys { + String PREFIX = "raft.server"; + int KB = 1024; + int MB = 1024*KB; + + String STORAGE_DIR_KEY = PREFIX + ".storage.dir"; + String STORAGE_DIR_DEFAULT = "file:///tmp/raft-server/"; + static String storageDir(BiFunction<String, String, String> getTrimmed) { + return ConfUtils.get(getTrimmed, STORAGE_DIR_KEY, STORAGE_DIR_DEFAULT); + } + static void setStorageDir(BiConsumer<String, String> setString, String storageDir) { + ConfUtils.set(setString, STORAGE_DIR_KEY, storageDir); + } /** * When bootstrapping a new peer, If the gap between the match index of the * peer and the leader's latest committed index is less than this gap, we * treat the peer as caught-up. */ - String RAFT_SERVER_STAGING_CATCHUP_GAP_KEY = "raft.server.staging.catchup.gap"; - int RAFT_SERVER_STAGING_CATCHUP_GAP_DEFAULT = 1000; // increase this number when write throughput is high - - String RAFT_SERVER_LOG_APPENDER_BUFFER_CAPACITY_KEY = "raft.server.log.appender.buffer.capacity"; - int RAFT_SERVER_LOG_APPENDER_BUFFER_CAPACITY_DEFAULT = 4 * 1024 * 1024; // 4MB - - String RAFT_SERVER_LOG_APPENDER_BATCH_ENABLED_KEY = "raft.server.log.appender.batch.enabled"; - boolean RAFT_SERVER_LOG_APPENDER_BATCH_ENABLED_DEFAULT = false; + String STAGING_CATCHUP_GAP_KEY = PREFIX + ".staging.catchup.gap"; + int STAGING_CATCHUP_GAP_DEFAULT = 1000; // increase this number when write throughput is high + static int stagingCatchupGap(BiFunction<String, Integer, Integer> getInt) { + return ConfUtils.getInt(getInt, + STAGING_CATCHUP_GAP_KEY, STAGING_CATCHUP_GAP_DEFAULT, requireMin(0)); + } + static void setStagingCatchupGap(BiConsumer<String, Integer> setInt, int stagingCatchupGap) { + ConfUtils.setInt(setInt, STAGING_CATCHUP_GAP_KEY, stagingCatchupGap); + } + + interface Log { + String PREFIX = RaftServerConfigKeys.PREFIX + ".log"; + + String USE_MEMORY_KEY = PREFIX + ".use.memory"; + boolean USE_MEMORY_DEFAULT = false; + static boolean useMemory(BiFunction<String, Boolean, Boolean> getBool) { + return ConfUtils.getBoolean(getBool, USE_MEMORY_KEY, USE_MEMORY_DEFAULT); + } + static void setUseMemory(BiConsumer<String, Boolean> setLong, boolean useMemory) { + ConfUtils.setBoolean(setLong, USE_MEMORY_KEY, useMemory); + } + + String SEGMENT_SIZE_MAX_KEY = PREFIX + ".segment.size.max"; + long SEGMENT_SIZE_MAX_DEFAULT = 8*MB; + static long segmentSizeMax(BiFunction<String, Long, Long> getLong) { + return ConfUtils.getLong(getLong, + SEGMENT_SIZE_MAX_KEY, SEGMENT_SIZE_MAX_DEFAULT, requireMin(0L)); + } + static void setSegmentSizeMax( + BiConsumer<String, Long> setLong, long segmentSizeMax) { + ConfUtils.setLong(setLong, SEGMENT_SIZE_MAX_KEY, segmentSizeMax); + } + + String PREALLOCATED_SIZE_KEY = PREFIX + ".preallocated.size"; + int PREALLOCATED_SIZE_DEFAULT = 4*MB; + static int preallocatedSize(BiFunction<String, Integer, Integer> getInt) { + return ConfUtils.getInt(getInt, + PREALLOCATED_SIZE_KEY, PREALLOCATED_SIZE_DEFAULT, requireMin(0)); + } + static void setPreallocatedSize(BiConsumer<String, Integer> setInt, int preallocatedSize) { + ConfUtils.setInt(setInt, PREALLOCATED_SIZE_KEY, preallocatedSize); + } + + String WRITE_BUFFER_SIZE_KEY = PREFIX + ".write.buffer.size"; + int WRITE_BUFFER_SIZE_DEFAULT = 64*KB; + static int writeBufferSize(BiFunction<String, Integer, Integer> getInt) { + return ConfUtils.getInt(getInt, + WRITE_BUFFER_SIZE_KEY, WRITE_BUFFER_SIZE_DEFAULT, requireMin(0)); + } + static void setWriteBufferSize(BiConsumer<String, Integer> setInt, int writeBufferSize) { + ConfUtils.setInt(setInt, WRITE_BUFFER_SIZE_KEY, writeBufferSize); + } + + String FORCE_SYNC_NUM_KEY = PREFIX + ".force.sync.num"; + int FORCE_SYNC_NUM_DEFAULT = 128; + static int forceSyncNum(BiFunction<String, Integer, Integer> getInt) { + return ConfUtils.getInt(getInt, + FORCE_SYNC_NUM_KEY, FORCE_SYNC_NUM_DEFAULT, requireMin(0)); + } + + interface Appender { + String PREFIX = Log.PREFIX + ".appender"; + + String BUFFER_CAPACITY_KEY = PREFIX + ".buffer.capacity"; + int BUFFER_CAPACITY_DEFAULT = 4*MB; + static int bufferCapacity(BiFunction<String, Integer, Integer> getInt) { + return ConfUtils.getInt(getInt, + BUFFER_CAPACITY_KEY, BUFFER_CAPACITY_DEFAULT, requireMin(0)); + } + static void setBufferCapacity(BiConsumer<String, Integer> setInt, int bufferCapacity) { + ConfUtils.setInt(setInt, BUFFER_CAPACITY_KEY, bufferCapacity); + } + + String BATCH_ENABLED_KEY = PREFIX + ".batch.enabled"; + boolean BATCH_ENABLED_DEFAULT = false; + static boolean batchEnabled(BiFunction<String, Boolean, Boolean> getBool) { + return ConfUtils.getBoolean(getBool, BATCH_ENABLED_KEY, BATCH_ENABLED_DEFAULT); + } + static void setBatchEnabled( + BiConsumer<String, Boolean> setLong, boolean batchEnabled) { + ConfUtils.setBoolean(setLong, BATCH_ENABLED_KEY, batchEnabled); + } + + String SNAPSHOT_CHUNK_SIZE_MAX_KEY = PREFIX + ".snapshot.chunk.size.max"; + int SNAPSHOT_CHUNK_SIZE_MAX_DEFAULT = 16*MB; + static int snapshotChunkSizeMax(BiFunction<String, Integer, Integer> getInt) { + return ConfUtils.getInt(getInt, + SNAPSHOT_CHUNK_SIZE_MAX_KEY, SNAPSHOT_CHUNK_SIZE_MAX_DEFAULT, requireMin(0)); + } + } + } + + interface Snapshot { + String PREFIX = RaftServerConfigKeys.PREFIX + ".snapshot"; + + /** whether trigger snapshot when log size exceeds limit */ + String AUTO_TRIGGER_ENABLED_KEY = PREFIX + ".auto.trigger.enabled"; + /** by default let the state machine to decide when to do checkpoint */ + boolean AUTO_TRIGGER_ENABLED_DEFAULT = false; + static boolean autoTriggerEnabled(BiFunction<String, Boolean, Boolean> getBool) { + return ConfUtils.getBoolean(getBool, + AUTO_TRIGGER_ENABLED_KEY, AUTO_TRIGGER_ENABLED_DEFAULT); + } + static void setAutoTriggerEnabled( + BiConsumer<String, Boolean> setLong, boolean autoTriggerThreshold) { + ConfUtils.setBoolean(setLong, AUTO_TRIGGER_ENABLED_KEY, autoTriggerThreshold); + } + + /** log size limit (in number of log entries) that triggers the snapshot */ + String AUTO_TRIGGER_THRESHOLD_KEY = PREFIX + ".auto.trigger.threshold"; + long AUTO_TRIGGER_THRESHOLD_DEFAULT = 400000L; + static long autoTriggerThreshold(BiFunction<String, Long, Long> getLong) { + return ConfUtils.getLong(getLong, + AUTO_TRIGGER_THRESHOLD_KEY, AUTO_TRIGGER_THRESHOLD_DEFAULT, requireMin(0L)); + } + static void setAutoTriggerThreshold( + BiConsumer<String, Long> setLong, long autoTriggerThreshold) { + ConfUtils.setLong(setLong, AUTO_TRIGGER_THRESHOLD_KEY, autoTriggerThreshold); + } + } + /** server rpc timeout related */ + interface Rpc { + String PREFIX = RaftServerConfigKeys.PREFIX + ".rpc"; + + String TIMEOUT_MIN_MS_KEY = PREFIX + ".timeout.min.ms"; + int TIMEOUT_MIN_MS_DEFAULT = 150; + static int timeoutMinMs(BiFunction<String, Integer, Integer> getInt) { + return ConfUtils.getInt(getInt, + TIMEOUT_MIN_MS_KEY, TIMEOUT_MIN_MS_DEFAULT, requireMin(0)); + } + + String TIMEOUT_MAX_MS_KEY = PREFIX + ".timeout.max.ms"; + int TIMEOUT_MAX_MS_DEFAULT = 300; + static int timeoutMaxMs(BiFunction<String, Integer, Integer> getInt) { + return ConfUtils.getInt(getInt, + TIMEOUT_MAX_MS_KEY, TIMEOUT_MAX_MS_DEFAULT, requireMin(0)); + } + + String SLEEP_TIME_MS_KEY = PREFIX + ".sleep.time.ms"; + int SLEEP_TIME_MS_DEFAULT = 25; + static int sleepTimeMs(BiFunction<String, Integer, Integer> getInt) { + return ConfUtils.getInt(getInt, + SLEEP_TIME_MS_KEY, SLEEP_TIME_MS_DEFAULT, requireMin(0)); + } + } + + static void main(String[] args) { + ConfUtils.printAll(RaftServerConfigKeys.class); + } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/b5a07be0/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java index 1750a04..c5f0060 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java @@ -17,38 +17,10 @@ */ package org.apache.ratis.server.impl; -import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_RPC_SLEEP_TIME_MS_DEFAULT; -import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_RPC_SLEEP_TIME_MS_KEY; -import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_STAGING_CATCHUP_GAP_DEFAULT; -import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_STAGING_CATCHUP_GAP_KEY; -import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SNAPSHOT_CHUNK_MAX_SIZE_DEFAULT; -import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SNAPSHOT_CHUNK_MAX_SIZE_KEY; -import static org.apache.ratis.server.impl.LeaderState.StateUpdateEventType.STAGINGPROGRESS; -import static org.apache.ratis.server.impl.LeaderState.StateUpdateEventType.STEPDOWN; -import static org.apache.ratis.server.impl.LeaderState.StateUpdateEventType.UPDATECOMMIT; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; - +import com.google.common.base.Preconditions; import org.apache.ratis.conf.RaftProperties; -import org.apache.ratis.protocol.Message; -import org.apache.ratis.protocol.RaftClientRequest; -import org.apache.ratis.protocol.RaftPeer; -import org.apache.ratis.protocol.RaftPeerId; -import org.apache.ratis.protocol.ReconfigurationTimeoutException; -import org.apache.ratis.protocol.SetConfigurationRequest; +import org.apache.ratis.protocol.*; +import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.storage.RaftLog; import org.apache.ratis.shaded.proto.RaftProtos.LeaderNoOp; import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; @@ -59,7 +31,15 @@ import org.apache.ratis.util.ProtoUtils; import org.apache.ratis.util.Timestamp; import org.slf4j.Logger; -import com.google.common.base.Preconditions; +import java.io.IOException; +import java.util.*; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.apache.ratis.server.impl.LeaderState.StateUpdateEventType.*; /** * States for leader only. It contains three different types of processors: @@ -113,22 +93,14 @@ public class LeaderState { private volatile boolean running = true; private final int stagingCatchupGap; - private final int snapshotChunkMaxSize; private final int syncInterval; private final long placeHolderIndex; LeaderState(RaftServerImpl server, RaftProperties properties) { this.server = server; - stagingCatchupGap = properties.getInt( - RAFT_SERVER_STAGING_CATCHUP_GAP_KEY, - RAFT_SERVER_STAGING_CATCHUP_GAP_DEFAULT); - snapshotChunkMaxSize = properties.getInt( - RAFT_SNAPSHOT_CHUNK_MAX_SIZE_KEY, - RAFT_SNAPSHOT_CHUNK_MAX_SIZE_DEFAULT); - syncInterval = properties.getInt( - RAFT_SERVER_RPC_SLEEP_TIME_MS_KEY, - RAFT_SERVER_RPC_SLEEP_TIME_MS_DEFAULT); + stagingCatchupGap = RaftServerConfigKeys.stagingCatchupGap(properties::getInt); + syncInterval = RaftServerConfigKeys.Rpc.sleepTimeMs(properties::getInt); final ServerState state = server.getState(); this.raftLog = state.getLog(); @@ -205,10 +177,6 @@ public class LeaderState { return currentTerm; } - int getSnapshotChunkMaxSize() { - return snapshotChunkMaxSize; - } - int getSyncInterval() { return syncInterval; } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/b5a07be0/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java index 5599699..fdab93d 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java @@ -17,44 +17,31 @@ */ package org.apache.ratis.server.impl; -import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_BATCH_ENABLED_DEFAULT; -import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_BATCH_ENABLED_KEY; -import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_BUFFER_CAPACITY_DEFAULT; -import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_BUFFER_CAPACITY_KEY; -import static org.apache.ratis.server.impl.RaftServerConstants.INVALID_LOG_INDEX; - -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InterruptedIOException; -import java.nio.file.Path; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.NoSuchElementException; -import java.util.UUID; - +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import org.apache.commons.io.IOUtils; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.impl.LeaderState.StateUpdateEventType; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.storage.FileInfo; import org.apache.ratis.server.storage.RaftLog; import org.apache.ratis.shaded.com.google.protobuf.ByteString; -import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto; -import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesRequestProto; -import org.apache.ratis.shaded.proto.RaftProtos.FileChunkProto; -import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotReplyProto; -import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotRequestProto; -import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotResult; -import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; +import org.apache.ratis.shaded.proto.RaftProtos.*; import org.apache.ratis.statemachine.SnapshotInfo; import org.apache.ratis.util.Daemon; import org.apache.ratis.util.ProtoUtils; import org.apache.ratis.util.Timestamp; import org.slf4j.Logger; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.nio.file.Path; +import java.util.*; + +import static org.apache.ratis.server.impl.RaftServerConstants.INVALID_LOG_INDEX; /** * A daemon thread appending log entries to a follower peer. @@ -70,6 +57,7 @@ public class LogAppender extends Daemon { private final boolean batchSending; private final LogEntryBuffer buffer; private final long leaderTerm; + private final int snapshotChunkMaxSize; private volatile boolean sending = true; @@ -78,12 +66,11 @@ public class LogAppender extends Daemon { this.server = server; this.leaderState = leaderState; this.raftLog = server.getState().getLog(); - this.maxBufferSize = server.getProperties().getInt( - RAFT_SERVER_LOG_APPENDER_BUFFER_CAPACITY_KEY, - RAFT_SERVER_LOG_APPENDER_BUFFER_CAPACITY_DEFAULT); - this.batchSending = server.getProperties().getBoolean( - RAFT_SERVER_LOG_APPENDER_BATCH_ENABLED_KEY, - RAFT_SERVER_LOG_APPENDER_BATCH_ENABLED_DEFAULT); + + final RaftProperties properties = server.getProperties(); + this.maxBufferSize = RaftServerConfigKeys.Log.Appender.bufferCapacity(properties::getInt); + this.batchSending = RaftServerConfigKeys.Log.Appender.batchEnabled(properties::getBoolean); + this.snapshotChunkMaxSize = RaftServerConfigKeys.Log.Appender.snapshotChunkSizeMax(properties::getInt); this.buffer = new LogEntryBuffer(); this.leaderTerm = server.getState().getCurrentTerm(); } @@ -258,14 +245,17 @@ public class LogAppender extends Daemon { currentFileInfo = files.get(fileIndex); File snapshotFile = currentFileInfo.getPath().toFile(); currentFileSize = snapshotFile.length(); - final int bufLength = - (int) Math.min(leaderState.getSnapshotChunkMaxSize(), currentFileSize); + final int bufLength = getSnapshotChunkLength(currentFileSize); currentBuf = new byte[bufLength]; currentOffset = 0; chunkIndex = 0; in = new FileInputStream(snapshotFile); } + private int getSnapshotChunkLength(long len) { + return len < snapshotChunkMaxSize? (int)len: snapshotChunkMaxSize; + } + @Override public Iterator<InstallSnapshotRequestProto> iterator() { return new Iterator<InstallSnapshotRequestProto>() { @@ -279,8 +269,8 @@ public class LogAppender extends Daemon { if (fileIndex >= files.size()) { throw new NoSuchElementException(); } - int targetLength = (int) Math.min(currentFileSize - currentOffset, - leaderState.getSnapshotChunkMaxSize()); + final int targetLength = getSnapshotChunkLength( + currentFileSize - currentOffset); FileChunkProto chunk; try { chunk = readFileChunk(currentFileInfo, in, currentBuf, http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/b5a07be0/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 3c7c0e1..98e37e5 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -92,12 +92,8 @@ public class RaftServerImpl implements RaftServer { RaftConfiguration raftConf, RaftProperties properties, Parameters parameters) throws IOException { this.lifeCycle = new LifeCycle(id); - minTimeoutMs = properties.getInt( - RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MIN_MS_KEY, - RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MIN_MS_DEFAULT); - maxTimeoutMs = properties.getInt( - RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MAX_MS_KEY, - RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MAX_MS_DEFAULT); + minTimeoutMs = RaftServerConfigKeys.Rpc.timeoutMinMs(properties::getInt); + maxTimeoutMs = RaftServerConfigKeys.Rpc.timeoutMaxMs(properties::getInt); Preconditions.checkArgument(maxTimeoutMs > minTimeoutMs, "max timeout: %s, min timeout: %s", maxTimeoutMs, minTimeoutMs); this.properties = properties; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/b5a07be0/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java index 4b7efbd..4ad099d 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java @@ -17,21 +17,14 @@ */ package org.apache.ratis.server.impl; -import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_USE_MEMORY_LOG_DEFAULT; -import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_USE_MEMORY_LOG_KEY; - -import java.io.Closeable; -import java.io.IOException; - +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.ClientId; import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.protocol.TermIndex; -import org.apache.ratis.server.storage.MemoryRaftLog; -import org.apache.ratis.server.storage.RaftLog; -import org.apache.ratis.server.storage.RaftStorage; -import org.apache.ratis.server.storage.SegmentedRaftLog; -import org.apache.ratis.server.storage.SnapshotManager; +import org.apache.ratis.server.storage.*; import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotRequestProto; import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; import org.apache.ratis.statemachine.SnapshotInfo; @@ -39,8 +32,8 @@ import org.apache.ratis.statemachine.StateMachine; import org.apache.ratis.statemachine.TransactionContext; import org.apache.ratis.util.ProtoUtils; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; +import java.io.Closeable; +import java.io.IOException; /** * Common states of a raft peer. Protected by RaftServer's lock. @@ -131,8 +124,7 @@ public class ServerState implements Closeable { private RaftLog initLog(RaftPeerId id, RaftProperties prop, RaftServerImpl server, long lastIndexInSnapshot) throws IOException { final RaftLog log; - if (prop.getBoolean(RAFT_SERVER_USE_MEMORY_LOG_KEY, - RAFT_SERVER_USE_MEMORY_LOG_DEFAULT)) { + if (RaftServerConfigKeys.Log.useMemory(prop::getBoolean)) { log = new MemoryRaftLog(id); } else { log = new SegmentedRaftLog(id, server, this.storage, http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/b5a07be0/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java index a2a4149..fb153cb 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java @@ -64,7 +64,7 @@ class StateMachineUpdater implements Runnable { private volatile long lastAppliedIndex; private final boolean autoSnapshotEnabled; - private final long snapshotThreshold; + private final long autoSnapshotThreshold; private long lastSnapshotIndex; private final Thread updater; @@ -80,12 +80,8 @@ class StateMachineUpdater implements Runnable { this.lastAppliedIndex = lastAppliedIndex; lastSnapshotIndex = lastAppliedIndex; - autoSnapshotEnabled = properties.getBoolean( - RaftServerConfigKeys.RAFT_SERVER_AUTO_SNAPSHOT_ENABLED_KEY, - RaftServerConfigKeys.RAFT_SERVER_AUTO_SNAPSHOT_ENABLED_DEFAULT); - snapshotThreshold = properties.getLong( - RaftServerConfigKeys.RAFT_SERVER_SNAPSHOT_TRIGGER_THRESHOLD_KEY, - RaftServerConfigKeys.RAFT_SERVER_SNAPSHOT_TRIGGER_THRESHOLD_DEFAULT); + autoSnapshotEnabled = RaftServerConfigKeys.Snapshot.autoTriggerEnabled(properties::getBoolean); + autoSnapshotThreshold = RaftServerConfigKeys.Snapshot.autoTriggerThreshold(properties::getLong); updater = new Daemon(this); } @@ -205,7 +201,7 @@ class StateMachineUpdater implements Runnable { private boolean shouldTakeSnapshot(long currentAppliedIndex) { return autoSnapshotEnabled && (state != State.RELOAD) && - (currentAppliedIndex - lastSnapshotIndex >= snapshotThreshold); + (currentAppliedIndex - lastSnapshotIndex >= autoSnapshotThreshold); } long getLastAppliedIndex() { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/b5a07be0/ratis-server/src/main/java/org/apache/ratis/server/storage/LogOutputStream.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/LogOutputStream.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/LogOutputStream.java index 6ed70fa..6a0f5da 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/LogOutputStream.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/LogOutputStream.java @@ -17,22 +17,8 @@ */ package org.apache.ratis.server.storage; -import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_LOG_SEGMENT_MAX_SIZE_DEFAULT; -import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_LOG_SEGMENT_MAX_SIZE_KEY; -import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_LOG_SEGMENT_PREALLOCATED_SIZE_DEFAULT; -import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_LOG_SEGMENT_PREALLOCATED_SIZE_KEY; -import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_LOG_WRITE_BUFFER_SIZE_DEFAULT; -import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_LOG_WRITE_BUFFER_SIZE_KEY; - -import java.io.Closeable; -import java.io.File; -import java.io.IOException; -import java.io.RandomAccessFile; -import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; -import java.util.zip.Checksum; - import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.impl.RaftServerConstants; import org.apache.ratis.shaded.com.google.protobuf.CodedOutputStream; import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; @@ -41,6 +27,14 @@ import org.apache.ratis.util.RaftUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.util.zip.Checksum; + public class LogOutputStream implements Closeable { private static final Logger LOG = LoggerFactory.getLogger(LogOutputStream.class); @@ -67,18 +61,14 @@ public class LogOutputStream implements Closeable { throws IOException { this.file = file; this.checksum = new PureJavaCrc32C(); - this.segmentMaxSize = properties.getLong(RAFT_LOG_SEGMENT_MAX_SIZE_KEY, - RAFT_LOG_SEGMENT_MAX_SIZE_DEFAULT); - this.preallocatedSize = properties.getLong( - RAFT_LOG_SEGMENT_PREALLOCATED_SIZE_KEY, - RAFT_LOG_SEGMENT_PREALLOCATED_SIZE_DEFAULT); + this.segmentMaxSize = RaftServerConfigKeys.Log.segmentSizeMax(properties::getLong); + this.preallocatedSize = RaftServerConfigKeys.Log.preallocatedSize(properties::getInt); RandomAccessFile rp = new RandomAccessFile(file, "rw"); fc = rp.getChannel(); fc.position(fc.size()); preallocatedPos = fc.size(); - int bufferSize = properties.getInt(RAFT_LOG_WRITE_BUFFER_SIZE_KEY, - RAFT_LOG_WRITE_BUFFER_SIZE_DEFAULT); + final int bufferSize = RaftServerConfigKeys.Log.writeBufferSize(properties::getInt); out = new BufferedWriteChannel(fc, bufferSize); if (!append) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/b5a07be0/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java index ae71cc1..a78d117 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java @@ -17,9 +17,6 @@ */ package org.apache.ratis.server.storage; -import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_LOG_FORCE_SYNC_NUM_DEFAULT; -import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_LOG_FORCE_SYNC_NUM_KEY; - import java.io.File; import java.io.IOException; import java.util.concurrent.ArrayBlockingQueue; @@ -28,6 +25,7 @@ import java.util.concurrent.TimeUnit; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.io.nativeio.NativeIO; +import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.impl.RaftServerConstants; import org.apache.ratis.server.impl.RaftServerImpl; import org.apache.ratis.server.storage.LogSegment.SegmentFileInfo; @@ -78,8 +76,7 @@ class RaftLogWorker implements Runnable { this.raftServer = raftServer; this.storage = storage; this.properties = properties; - this.forceSyncNum = properties.getInt(RAFT_LOG_FORCE_SYNC_NUM_KEY, - RAFT_LOG_FORCE_SYNC_NUM_DEFAULT); + this.forceSyncNum = RaftServerConfigKeys.Log.forceSyncNum(properties::getInt); workerThread = new Thread(this, getClass().getSimpleName() + " for " + storage); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/b5a07be0/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorage.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorage.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorage.java index 09ea55c..2b73b0a 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorage.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorage.java @@ -18,8 +18,8 @@ package org.apache.ratis.server.storage; import com.google.common.base.Preconditions; - import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.impl.RaftServerConstants; import org.apache.ratis.server.storage.RaftStorageDirectory.StorageState; import org.apache.ratis.statemachine.SnapshotInfo; @@ -28,9 +28,6 @@ import org.apache.ratis.util.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_STORAGE_DIR_DEFAULT; -import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_STORAGE_DIR_KEY; - import java.io.Closeable; import java.io.File; import java.io.IOException; @@ -47,8 +44,7 @@ public class RaftStorage implements Closeable { public RaftStorage(RaftProperties prop, RaftServerConstants.StartupOption option) throws IOException { - final String dir = prop.get(RAFT_SERVER_STORAGE_DIR_KEY, - RAFT_SERVER_STORAGE_DIR_DEFAULT); + final String dir = RaftServerConfigKeys.storageDir(prop::getTrimmed); storageDir = new RaftStorageDirectory( new File(FileUtils.stringAsURI(dir).getPath())); if (option == RaftServerConstants.StartupOption.FORMAT) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/b5a07be0/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java index 870d802..90b5f8d 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java @@ -17,17 +17,12 @@ */ package org.apache.ratis.server.storage; -import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_LOG_SEGMENT_MAX_SIZE_DEFAULT; -import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_LOG_SEGMENT_MAX_SIZE_KEY; - -import java.io.File; -import java.io.IOException; -import java.util.Iterator; -import java.util.List; - +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import org.apache.commons.io.Charsets; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.impl.ConfigurationManager; import org.apache.ratis.server.impl.RaftServerConstants; import org.apache.ratis.server.impl.RaftServerImpl; @@ -36,8 +31,10 @@ import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; import org.apache.ratis.util.AutoCloseableLock; import org.apache.ratis.util.CodeInjectionForTesting; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; +import java.io.File; +import java.io.IOException; +import java.util.Iterator; +import java.util.List; /** * The RaftLog implementation that writes log entries into segmented files in @@ -107,8 +104,7 @@ public class SegmentedRaftLog extends RaftLog { throws IOException { super(selfId); this.storage = storage; - this.segmentMaxSize = properties.getLong(RAFT_LOG_SEGMENT_MAX_SIZE_KEY, - RAFT_LOG_SEGMENT_MAX_SIZE_DEFAULT); + this.segmentMaxSize = RaftServerConfigKeys.Log.segmentSizeMax(properties::getLong); cache = new RaftLogCache(); fileLogWorker = new RaftLogWorker(server, storage, properties); lastCommitted.set(lastIndexInSnapshot); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/b5a07be0/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java index 6d14c24..0f87da1 100644 --- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java +++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java @@ -47,8 +47,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; import java.util.stream.StreamSupport; -import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MIN_MS_DEFAULT; - public abstract class MiniRaftCluster { public static final Logger LOG = LoggerFactory.getLogger(MiniRaftCluster.class); public static final DelayLocalExecutionInjection logSyncDelay = @@ -199,9 +197,7 @@ public abstract class MiniRaftCluster { } public int getMaxTimeout() { - return properties.getInt( - RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MAX_MS_KEY, - RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MAX_MS_DEFAULT); + return RaftServerConfigKeys.Rpc.timeoutMaxMs(properties::getInt); } public RaftConfiguration getConf() { @@ -215,7 +211,7 @@ public abstract class MiniRaftCluster { formatDir(dirStr); } final RaftProperties prop = new RaftProperties(properties); - prop.set(RaftServerConfigKeys.RAFT_SERVER_STORAGE_DIR_KEY, dirStr); + RaftServerConfigKeys.setStorageDir(prop::set, dirStr); final StateMachine stateMachine = getStateMachine4Test(properties); return newRaftServer(id, stateMachine, conf, prop); } catch (IOException e) { @@ -433,7 +429,7 @@ public abstract class MiniRaftCluster { // least ELECTION_TIMEOUT_MIN. In this way when the target leader request a // vote, all non-leader servers can grant the vote. // Disable the target leader server RPC so that it can request a vote. - blockQueueAndSetDelay(leaderId, RAFT_SERVER_RPC_TIMEOUT_MIN_MS_DEFAULT); + blockQueueAndSetDelay(leaderId, RaftServerConfigKeys.Rpc.TIMEOUT_MIN_MS_DEFAULT); // Reopen queues so that the vote can make progress. blockQueueAndSetDelay(leaderId, 0); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/b5a07be0/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java index 6b2e34c..3ada880 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java @@ -230,7 +230,7 @@ public class RaftTestUtil { public static void block(BooleanSupplier isBlocked) throws InterruptedException { for(; isBlocked.getAsBoolean(); ) { - Thread.sleep(RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MAX_MS_DEFAULT); + Thread.sleep(RaftServerConfigKeys.Rpc.TIMEOUT_MAX_MS_DEFAULT); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/b5a07be0/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java index 90cbef5..00e12d2 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java @@ -64,17 +64,17 @@ public abstract class RaftReconfigurationBaseTest { private static final ClientId clientId = ClientId.createId(); + static final int STAGING_CATCHUP_GAP = 10; @BeforeClass public static void setup() { // set a small gap for tests - prop.setInt(RaftServerConfigKeys.RAFT_SERVER_STAGING_CATCHUP_GAP_KEY, 10); + RaftServerConfigKeys.setStagingCatchupGap(prop::setInt, STAGING_CATCHUP_GAP); } public abstract MiniRaftCluster getCluster(int peerNum) throws IOException; private static int getStagingGap() { - return prop.getInt(RaftServerConfigKeys.RAFT_SERVER_STAGING_CATCHUP_GAP_KEY, - RaftServerConfigKeys.RAFT_SERVER_STAGING_CATCHUP_GAP_DEFAULT); + return STAGING_CATCHUP_GAP; } /** http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/b5a07be0/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java index 3ed2596..718c277 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java @@ -32,6 +32,8 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.concurrent.ThreadLocalRandom; +import static org.apache.ratis.conf.ConfUtils.requireMin; + public class MiniRaftClusterWithSimulatedRpc extends MiniRaftCluster { static final Logger LOG = LoggerFactory.getLogger(MiniRaftClusterWithSimulatedRpc.class); @@ -47,7 +49,7 @@ public class MiniRaftClusterWithSimulatedRpc extends MiniRaftCluster { } final int simulateLatencyMs = ConfUtils.getInt(prop::getInt, SimulatedRequestReply.SIMULATE_LATENCY_KEY, - SimulatedRequestReply.SIMULATE_LATENCY_DEFAULT, 0, null); + SimulatedRequestReply.SIMULATE_LATENCY_DEFAULT, requireMin(0)); final SimulatedRequestReply<RaftServerRequest, RaftServerReply> serverRequestReply = new SimulatedRequestReply<>(simulateLatencyMs); final SimulatedClientRpc client2serverRequestReply http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/b5a07be0/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRequestReply.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRequestReply.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRequestReply.java index 64aaeac..75e49ea 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRequestReply.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRequestReply.java @@ -41,7 +41,7 @@ class SimulatedRequestReply<REQUEST extends RaftRpcMessage, public static final String SIMULATE_LATENCY_KEY = SimulatedRequestReply.class.getName() + ".simulateLatencyMs"; public static final int SIMULATE_LATENCY_DEFAULT - = RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MIN_MS_DEFAULT; + = RaftServerConfigKeys.Rpc.TIMEOUT_MIN_MS_DEFAULT; public static final long TIMEOUT = 3000L; private static class ReplyOrException<REPLY> { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/b5a07be0/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogReadWrite.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogReadWrite.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogReadWrite.java index f72d007..026d1c1 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogReadWrite.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogReadWrite.java @@ -17,24 +17,12 @@ */ package org.apache.ratis.server.storage; -import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_LOG_SEGMENT_MAX_SIZE_KEY; -import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_LOG_SEGMENT_PREALLOCATED_SIZE_DEFAULT; -import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_LOG_SEGMENT_PREALLOCATED_SIZE_KEY; -import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_STORAGE_DIR_KEY; - -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.RandomAccessFile; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; - import org.apache.ratis.RaftTestUtil; import org.apache.ratis.RaftTestUtil.SimpleOperation; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.ChecksumException; import org.apache.ratis.protocol.ClientId; +import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.impl.RaftServerConstants; import org.apache.ratis.server.impl.RaftServerConstants.StartupOption; import org.apache.ratis.shaded.com.google.protobuf.CodedOutputStream; @@ -48,6 +36,16 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.ratis.server.RaftServerConfigKeys.MB; + /** * Test basic functionality of LogReader, LogInputStream, and LogOutputStream. */ @@ -64,7 +62,7 @@ public class TestRaftLogReadWrite { public void setup() throws Exception { storageDir = RaftTestUtil.getTestDir(TestRaftLogReadWrite.class); properties = new RaftProperties(); - properties.set(RAFT_SERVER_STORAGE_DIR_KEY, + RaftServerConfigKeys.setStorageDir(properties::set, FileUtils.fileAsURI(storageDir).toString()); } @@ -174,7 +172,7 @@ public class TestRaftLogReadWrite { out.flush(); // make sure the file contains padding - Assert.assertEquals(RAFT_LOG_SEGMENT_PREALLOCATED_SIZE_DEFAULT, + Assert.assertEquals(RaftServerConfigKeys.Log.PREALLOCATED_SIZE_DEFAULT, openSegment.length()); // check if the reader can correctly read the log file @@ -192,8 +190,8 @@ public class TestRaftLogReadWrite { */ @Test public void testReadWithCorruptPadding() throws IOException { - properties.setLong(RAFT_LOG_SEGMENT_PREALLOCATED_SIZE_KEY, 4 * 1024 * 1024); - properties.setLong(RAFT_LOG_SEGMENT_MAX_SIZE_KEY, 16 * 1024 * 1024); + RaftServerConfigKeys.Log.setPreallocatedSize(properties::setInt, 4*MB); + RaftServerConfigKeys.Log.setSegmentSizeMax(properties::setLong, 16*MB); RaftStorage storage = new RaftStorage(properties, StartupOption.REGULAR); File openSegment = storage.getStorageDir().getOpenLogFile(0); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/b5a07be0/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java index fa72f64..5ac5db3 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java @@ -17,9 +17,8 @@ */ package org.apache.ratis.server.storage; -import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_LOG_SEGMENT_MAX_SIZE_KEY; -import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_LOG_SEGMENT_PREALLOCATED_SIZE_KEY; -import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_LOG_WRITE_BUFFER_SIZE_KEY; +import static org.apache.ratis.server.RaftServerConfigKeys.KB; +import static org.apache.ratis.server.RaftServerConfigKeys.MB; import static org.apache.ratis.server.impl.RaftServerConstants.INVALID_LOG_INDEX; import static org.apache.ratis.server.storage.LogSegment.getEntrySize; @@ -57,7 +56,7 @@ public class TestRaftLogSegment { @Before public void setup() throws Exception { storageDir = RaftTestUtil.getTestDir(TestRaftLogSegment.class); - properties.set(RaftServerConfigKeys.RAFT_SERVER_STORAGE_DIR_KEY, + RaftServerConfigKeys.setStorageDir(properties::set, storageDir.getCanonicalPath()); } @@ -212,13 +211,10 @@ public class TestRaftLogSegment { SegmentedRaftLog.HEADER_BYTES.length, term); } - private RaftProperties getProperties(long maxSegmentSize, - long preallocatedSize) { + private RaftProperties getProperties(long maxSegmentSize, int preallocatedSize) { RaftProperties p = new RaftProperties(); - p.setLong(RAFT_LOG_SEGMENT_MAX_SIZE_KEY, - maxSegmentSize); - p.setLong(RaftServerConfigKeys.RAFT_LOG_SEGMENT_PREALLOCATED_SIZE_KEY, - preallocatedSize); + RaftServerConfigKeys.Log.setSegmentSizeMax(p::setLong, maxSegmentSize); + RaftServerConfigKeys.Log.setPreallocatedSize(p::setInt, preallocatedSize); return p; } @@ -237,7 +233,7 @@ public class TestRaftLogSegment { for (int a : preallocated) { try (LogOutputStream ignored = new LogOutputStream(file, false, getProperties(max, a))) { - Assert.assertEquals(file.length(), Math.min(max, a)); + Assert.assertEquals("max=" + max + ", a=" + a, file.length(), Math.min(max, a)); } try (LogInputStream in = new LogInputStream(file, 0, INVALID_LOG_INDEX, true)) { @@ -275,10 +271,10 @@ public class TestRaftLogSegment { */ @Test public void testPreallocationAndAppend() throws Exception { - final long max = 2 * 1024 * 1024; - properties.setLong(RAFT_LOG_SEGMENT_MAX_SIZE_KEY, max); - properties.setLong(RAFT_LOG_SEGMENT_PREALLOCATED_SIZE_KEY, 16 * 1024); - properties.setLong(RAFT_LOG_WRITE_BUFFER_SIZE_KEY, 10 * 1024); + final long max = 2*MB; + RaftServerConfigKeys.Log.setSegmentSizeMax(properties::setLong, max); + RaftServerConfigKeys.Log.setPreallocatedSize(properties::setInt, 16*KB); + RaftServerConfigKeys.Log.setWriteBufferSize(properties::setInt, 10*KB); RaftStorage storage = new RaftStorage(properties, StartupOption.REGULAR); final File file = storage.getStorageDir().getOpenLogFile(0); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/b5a07be0/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java index a51e933..cb375c6 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java @@ -49,7 +49,7 @@ public class TestRaftStorage { @Before public void setup() throws Exception { storageDir = RaftTestUtil.getTestDir(TestRaftStorage.class); - properties.set(RaftServerConfigKeys.RAFT_SERVER_STORAGE_DIR_KEY, + RaftServerConfigKeys.setStorageDir(properties::set, storageDir.getCanonicalPath()); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/b5a07be0/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java index 9b88321..fb4fe16 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java @@ -17,17 +17,6 @@ */ package org.apache.ratis.server.storage; -import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_LOG_SEGMENT_MAX_SIZE_KEY; -import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_LOG_SEGMENT_PREALLOCATED_SIZE_KEY; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.function.Supplier; - import org.apache.log4j.Level; import org.apache.ratis.MiniRaftCluster; import org.apache.ratis.RaftTestUtil; @@ -48,6 +37,16 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.function.Supplier; + +import static org.apache.ratis.server.RaftServerConfigKeys.KB; + public class TestSegmentedRaftLog { static { RaftUtils.setLogLevel(RaftLogWorker.LOG, Level.DEBUG); @@ -81,7 +80,7 @@ public class TestSegmentedRaftLog { public void setup() throws Exception { storageDir = RaftTestUtil.getTestDir(TestSegmentedRaftLog.class); properties = new RaftProperties(); - properties.set(RaftServerConfigKeys.RAFT_SERVER_STORAGE_DIR_KEY, + RaftServerConfigKeys.setStorageDir(properties::set, storageDir.getCanonicalPath()); storage = new RaftStorage(properties, RaftServerConstants.StartupOption.REGULAR); } @@ -191,8 +190,8 @@ public class TestSegmentedRaftLog { */ @Test public void testAppendAndRoll() throws Exception { - properties.setLong(RAFT_LOG_SEGMENT_PREALLOCATED_SIZE_KEY, 16 * 1024); - properties.setLong(RAFT_LOG_SEGMENT_MAX_SIZE_KEY, 128 * 1024); + RaftServerConfigKeys.Log.setPreallocatedSize(properties::setInt, 16*KB); + RaftServerConfigKeys.Log.setSegmentSizeMax(properties::setLong, 128*KB); List<SegmentRange> ranges = prepareRanges(1, 1024, 0); final byte[] content = new byte[1024]; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/b5a07be0/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java index 217d5ae..c08931a 100644 --- a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java @@ -17,13 +17,6 @@ */ package org.apache.ratis.statemachine; -import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_AUTO_SNAPSHOT_ENABLED_KEY; -import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_SNAPSHOT_TRIGGER_THRESHOLD_KEY; -import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_CALLID; - -import java.io.File; -import java.util.List; - import org.apache.log4j.Level; import org.apache.ratis.MiniRaftCluster; import org.apache.ratis.RaftTestUtil; @@ -34,6 +27,7 @@ import org.apache.ratis.protocol.ClientId; import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.protocol.SetConfigurationRequest; +import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.impl.RaftServerImpl; import org.apache.ratis.server.impl.RaftServerTestUtil; import org.apache.ratis.server.simulation.RequestHandler; @@ -50,6 +44,11 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; +import java.util.List; + +import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_CALLID; + public abstract class RaftSnapshotBaseTest { static { RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); @@ -92,9 +91,9 @@ public abstract class RaftSnapshotBaseTest { final RaftProperties prop = new RaftProperties(); prop.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, SimpleStateMachine4Testing.class, StateMachine.class); - prop.setLong(RAFT_SERVER_SNAPSHOT_TRIGGER_THRESHOLD_KEY, - SNAPSHOT_TRIGGER_THRESHOLD); - prop.setBoolean(RAFT_SERVER_AUTO_SNAPSHOT_ENABLED_KEY, true); + RaftServerConfigKeys.Snapshot.setAutoTriggerThreshold( + prop::setLong, SNAPSHOT_TRIGGER_THRESHOLD); + RaftServerConfigKeys.Snapshot.setAutoTriggerEnabled(prop::setBoolean, true); this.cluster = getFactory().newCluster(1, prop); cluster.start(); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/b5a07be0/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java index be34cc5..c8bf76e 100644 --- a/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java +++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java @@ -17,21 +17,6 @@ */ package org.apache.ratis.statemachine; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.util.Collections; -import java.util.List; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; - import org.apache.log4j.Level; import org.apache.ratis.MiniRaftCluster; import org.apache.ratis.RaftTestUtil; @@ -44,13 +29,22 @@ import org.apache.ratis.server.impl.RaftServerImpl; import org.apache.ratis.server.simulation.MiniRaftClusterWithSimulatedRpc; import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto; import org.apache.ratis.util.RaftUtils; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; +import org.junit.*; import org.junit.rules.Timeout; +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +import static org.junit.Assert.*; + /** * Test StateMachine related functionality */ @@ -65,7 +59,8 @@ public class TestStateMachine { private final RaftProperties properties = new RaftProperties(); { // TODO: fix and run with in-memory log. It fails with NPE - properties.setBoolean(RaftServerConfigKeys.RAFT_SERVER_USE_MEMORY_LOG_KEY, false); + // TODO: if change setUseMemory to true + RaftServerConfigKeys.Log.setUseMemory(properties::setBoolean, false); } private MiniRaftClusterWithSimulatedRpc cluster;
