Repository: incubator-ratis Updated Branches: refs/heads/master b5a07be08 -> 4e5d0d795
RATIS-36. Use ConfUtils for other ConfigKeys. Contributed by Tsz Wo Nicholas Sze. Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/4e5d0d79 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/4e5d0d79 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/4e5d0d79 Branch: refs/heads/master Commit: 4e5d0d7952484b7ca880b7ce4b7e9f557ba08654 Parents: b5a07be Author: Jing Zhao <[email protected]> Authored: Wed Mar 8 15:18:11 2017 -0800 Committer: Jing Zhao <[email protected]> Committed: Wed Mar 8 15:18:11 2017 -0800 ---------------------------------------------------------------------- ratis-client/pom.xml | 1 - .../org/apache/ratis/client/RaftClient.java | 7 +- .../ratis/client/RaftClientConfigKeys.java | 26 ++- .../ratis/client/impl/ClientImplUtils.java | 3 +- .../ratis/client/impl/RaftClientImpl.java | 7 +- .../java/org/apache/ratis/RaftConfigKeys.java | 15 +- .../java/org/apache/ratis/conf/ConfUtils.java | 108 ++++++++++--- .../org/apache/ratis/conf/RaftProperties.java | 99 +++--------- .../apache/ratis/util/CheckedBiConsumer.java | 30 ++++ .../java/org/apache/ratis/util/SizeInBytes.java | 82 ++++++++++ .../org/apache/ratis/util/TimeDuration.java | 141 ++++++++++++++++ .../ratis/util/TraditionalBinaryPrefix.java | 162 +++++++++++++++++++ .../org/apache/ratis/util/TestTimeDuration.java | 84 ++++++++++ .../ratis/util/TestTraditionalBinaryPrefix.java | 145 +++++++++++++++++ ratis-grpc/pom.xml | 2 - .../org/apache/ratis/grpc/GrpcConfigKeys.java | 96 +++++++++++ .../org/apache/ratis/grpc/RaftGRpcService.java | 8 +- .../apache/ratis/grpc/RaftGrpcConfigKeys.java | 47 ------ .../ratis/grpc/client/AppendStreamer.java | 34 ++-- .../ratis/grpc/client/RaftOutputStream.java | 17 +- .../ratis/grpc/server/GRpcLogAppender.java | 6 +- .../ratis/grpc/MiniRaftClusterWithGRpc.java | 4 +- .../org/apache/ratis/grpc/TestRaftStream.java | 12 +- ratis-hadoop/pom.xml | 1 - .../ratis/hadooprpc/HadoopConfigKeys.java | 37 ++--- .../apache/ratis/hadooprpc/HadoopFactory.java | 2 +- .../hadooprpc/server/HadoopRpcService.java | 4 +- .../hadooprpc/MiniRaftClusterWithHadoopRpc.java | 6 +- .../org/apache/ratis/netty/NettyConfigKeys.java | 18 +-- .../ratis/netty/server/NettyRpcService.java | 2 +- .../ratis/netty/MiniRaftClusterWithNetty.java | 4 +- .../MiniRaftClusterWithSimulatedRpc.java | 2 +- 32 files changed, 958 insertions(+), 254 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4e5d0d79/ratis-client/pom.xml ---------------------------------------------------------------------- diff --git a/ratis-client/pom.xml b/ratis-client/pom.xml index 4c3dd58..7a070a9 100644 --- a/ratis-client/pom.xml +++ b/ratis-client/pom.xml @@ -37,7 +37,6 @@ <dependency> <artifactId>ratis-common</artifactId> <groupId>org.apache.ratis</groupId> - <scope>provided</scope> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4e5d0d79/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java index bf85386..8f3c465 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java @@ -19,6 +19,7 @@ package org.apache.ratis.client; import org.apache.ratis.client.impl.ClientImplUtils; import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.util.TimeDuration; import org.apache.ratis.protocol.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,7 +64,7 @@ public interface RaftClient extends Closeable { private Collection<RaftPeer> servers; private RaftPeerId leaderId; private RaftProperties properties; - private int retryInterval = RaftClientConfigKeys.RAFT_RPC_TIMEOUT_MS_DEFAULT; + private TimeDuration retryInterval = RaftClientConfigKeys.Rpc.TIMEOUT_DEFAULT; private Builder() {} @@ -73,9 +74,7 @@ public interface RaftClient extends Closeable { clientId = ClientId.createId(); } if (properties != null) { - retryInterval = properties.getInt( - RaftClientConfigKeys.RAFT_RPC_TIMEOUT_MS_KEY, - RaftClientConfigKeys.RAFT_RPC_TIMEOUT_MS_DEFAULT); + retryInterval = RaftClientConfigKeys.Rpc.timeout(properties); } return ClientImplUtils.newRaftClient(clientId, Objects.requireNonNull(servers, "The 'servers' field is not initialized."), http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4e5d0d79/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java index e1e1593..9e7bd76 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java @@ -17,7 +17,29 @@ */ package org.apache.ratis.client; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.util.TimeDuration; + +import java.util.concurrent.TimeUnit; + +import static org.apache.ratis.conf.ConfUtils.*; + public interface RaftClientConfigKeys { - String RAFT_RPC_TIMEOUT_MS_KEY = "raft.rpc.timeout.ms"; - int RAFT_RPC_TIMEOUT_MS_DEFAULT = 300; + String PREFIX = "raft.client"; + + interface Rpc { + String PREFIX = RaftClientConfigKeys.PREFIX + ".rpc"; + + String TIMEOUT_KEY = PREFIX + ".timeout"; + TimeDuration TIMEOUT_DEFAULT = TimeDuration.valueOf(300, TimeUnit.MILLISECONDS); + + static TimeDuration timeout(RaftProperties properties) { + return getTimeDuration(properties.getTimeDuration(TIMEOUT_DEFAULT.getUnit()), + TIMEOUT_KEY, TIMEOUT_DEFAULT); + } + } + + static void main(String[] args) { + printAll(RaftClientConfigKeys.class); + } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4e5d0d79/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java index 85901db..882aa41 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java @@ -19,6 +19,7 @@ package org.apache.ratis.client.impl; import org.apache.ratis.client.RaftClient; import org.apache.ratis.client.RaftClientRpc; +import org.apache.ratis.util.TimeDuration; import org.apache.ratis.protocol.ClientId; import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeerId; @@ -29,7 +30,7 @@ import java.util.Collection; public class ClientImplUtils { public static RaftClient newRaftClient( ClientId clientId, Collection<RaftPeer> peers, RaftPeerId leaderId, - RaftClientRpc clientRpc, int retryInterval) { + RaftClientRpc clientRpc, TimeDuration retryInterval) { return new RaftClientImpl(clientId, peers, leaderId, clientRpc, retryInterval); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4e5d0d79/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java index be22305..4e5db47 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java @@ -19,6 +19,7 @@ package org.apache.ratis.client.impl; import org.apache.ratis.client.RaftClient; import org.apache.ratis.client.RaftClientRpc; +import org.apache.ratis.util.TimeDuration; import org.apache.ratis.protocol.*; import org.apache.ratis.util.RaftUtils; @@ -40,13 +41,13 @@ final class RaftClientImpl implements RaftClient { private final ClientId clientId; private final RaftClientRpc clientRpc; private final Collection<RaftPeer> peers; - private final int retryInterval; + private final TimeDuration retryInterval; private volatile RaftPeerId leaderId; RaftClientImpl(ClientId clientId, Collection<RaftPeer> peers, RaftPeerId leaderId, RaftClientRpc clientRpc, - int retryInterval) { + TimeDuration retryInterval) { this.clientId = clientId; this.clientRpc = clientRpc; this.peers = peers; @@ -99,7 +100,7 @@ final class RaftClientImpl implements RaftClient { // sleep and then retry try { - Thread.sleep(retryInterval); + retryInterval.sleep(); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); throw RaftUtils.toInterruptedIOException( http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4e5d0d79/ratis-common/src/main/java/org/apache/ratis/RaftConfigKeys.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/RaftConfigKeys.java b/ratis-common/src/main/java/org/apache/ratis/RaftConfigKeys.java index a8bc57d..b67acaf 100644 --- a/ratis-common/src/main/java/org/apache/ratis/RaftConfigKeys.java +++ b/ratis-common/src/main/java/org/apache/ratis/RaftConfigKeys.java @@ -17,13 +17,12 @@ */ package org.apache.ratis; -import org.apache.ratis.conf.ConfUtils; import org.apache.ratis.conf.RaftProperties; -import org.apache.ratis.rpc.SupportedRpcType; import org.apache.ratis.rpc.RpcType; +import org.apache.ratis.rpc.SupportedRpcType; import org.apache.ratis.util.RaftUtils; -import java.util.function.BiConsumer; +import static org.apache.ratis.conf.ConfUtils.*; public interface RaftConfigKeys { String PREFIX = "raft"; @@ -35,7 +34,7 @@ public interface RaftConfigKeys { String TYPE_DEFAULT = SupportedRpcType.GRPC.name(); static RpcType type(RaftProperties properties) { - final String t = ConfUtils.get(properties::get, TYPE_KEY, TYPE_DEFAULT); + final String t = get(properties::get, TYPE_KEY, TYPE_DEFAULT); try { // Try parsing it as a SupportedRpcType return SupportedRpcType.valueOfIgnoreCase(t); @@ -47,8 +46,12 @@ public interface RaftConfigKeys { RaftUtils.getClass(t, properties, RpcType.class)); } - static void setType(BiConsumer<String, String> setRpcType, RpcType type) { - ConfUtils.set(setRpcType, TYPE_KEY, type.name()); + static void setType(RaftProperties properties, RpcType type) { + set(properties::set, TYPE_KEY, type.name()); } } + + static void main(String[] args) { + printAll(RaftConfigKeys.class); + } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4e5d0d79/ratis-common/src/main/java/org/apache/ratis/conf/ConfUtils.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/conf/ConfUtils.java b/ratis-common/src/main/java/org/apache/ratis/conf/ConfUtils.java index 34cdbc3..d05c0f8 100644 --- a/ratis-common/src/main/java/org/apache/ratis/conf/ConfUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/conf/ConfUtils.java @@ -17,7 +17,10 @@ */ package org.apache.ratis.conf; +import org.apache.ratis.util.CheckedBiConsumer; import org.apache.ratis.util.NetUtils; +import org.apache.ratis.util.SizeInBytes; +import org.apache.ratis.util.TimeDuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -76,6 +79,26 @@ public interface ConfUtils { }; } + static BiConsumer<String, TimeDuration> requireNonNegativeTimeDuration() { + return (key, value) -> { + if (value.isNegative()) { + throw new IllegalArgumentException( + key + " = " + value + " is negative."); + } + }; + } + + static BiFunction<String, Long, Integer> requireInt() { + return (key, value) -> { + try { + return Math.toIntExact(value); + } catch (ArithmeticException ae) { + throw new IllegalArgumentException( + "Failed to cast " + key + " = " + value + " to int.", ae); + } + }; + } + static boolean getBoolean( BiFunction<String, Boolean, Boolean> booleanGetter, String key, boolean defaultValue, BiConsumer<String, Boolean>... assertions) { @@ -94,6 +117,22 @@ public interface ConfUtils { return get(longGetter, key, defaultValue, assertions); } + static SizeInBytes getSizeInBytes( + BiFunction<String, SizeInBytes, SizeInBytes> getter, + String key, SizeInBytes defaultValue, BiConsumer<String, SizeInBytes>... assertions) { + final SizeInBytes value = get(getter, key, defaultValue, assertions); + requireMin(0L).accept(key, value.getSize()); + return value; + } + + static TimeDuration getTimeDuration( + BiFunction<String, TimeDuration, TimeDuration> getter, + String key, TimeDuration defaultValue, BiConsumer<String, TimeDuration>... assertions) { + final TimeDuration value = get(getter, key, defaultValue, assertions); + requireNonNegativeTimeDuration().accept(key, value); + return value; + } + static <T> T get(BiFunction<String, T, T> getter, String key, T defaultValue, BiConsumer<String, T>... assertions) { final T value = getter.apply(key, defaultValue); @@ -126,6 +165,22 @@ public interface ConfUtils { set(longSetter, key, value, assertions); } + static void setSizeInBytes( + BiConsumer<String, String> stringSetter, String key, SizeInBytes value, + BiConsumer<String, Long>... assertions) { + final long v = value.getSize(); + Arrays.asList(assertions).forEach(a -> a.accept(key, v)); + set(stringSetter, key, value.getInput()); + } + + static void setSizeInBytesInt( + BiConsumer<String, String> stringSetter, String key, SizeInBytes value, + BiConsumer<String, Integer>... assertions) { + final int v = value.getSizeInt(); + Arrays.asList(assertions).forEach(a -> a.accept(key, v)); + set(stringSetter, key, value.getInput()); + } + static <T> void set( BiConsumer<String, T> setter, String key, T value, BiConsumer<String, T>... assertions) { @@ -152,42 +207,57 @@ public interface ConfUtils { out.accept("WARNING: Found non-static field " + f); return; } - final String fieldName = f.getName(); - if (!fieldName.endsWith("_KEY")) { - if (!fieldName.endsWith("_DEFAULT")) { - try { - out.accept("constant: " + fieldName + " = " + f.get(null)); - } catch (IllegalAccessException e) { - out.accept(fieldName + " is not public"); - } - } + if (printKey(confClass, out, f, "KEY", "DEFAULT", + (b, defaultField) -> + b.append(defaultField.getType().getSimpleName()).append(", ") + .append("default=" + defaultField.get(null)))) { + return; + } + if (printKey(confClass, out, f, "PARAMETER", "CLASS", + (b, classField) -> b.append(classField.get(null)))) { return; } + final String fieldName = f.getName(); + try { + out.accept("constant: " + fieldName + " = " + f.get(null)); + } catch (IllegalAccessException e) { + out.accept("WARNING: Failed to access " + f); + } + } + static boolean printKey( + Class<?> confClass, Consumer<Object> out, Field f, String KEY, String DEFAULT, + CheckedBiConsumer<StringBuilder, Field, IllegalAccessException> processDefault) { + final String fieldName = f.getName(); + if (fieldName.endsWith("_" + DEFAULT)) { + return true; + } + if (!fieldName.endsWith("_" + KEY)) { + return false; + } final StringBuilder b = new StringBuilder(); try { final Object keyName = f.get(null); - b.append("key: ").append(keyName); + b.append(KEY.toLowerCase()).append(": ").append(keyName); } catch (IllegalAccessException e) { - out.accept("WARNING: Failed to access key " + f); + out.accept("WARNING: Failed to access " + fieldName); b.append(fieldName + " is not public"); } - final String defaultFieldName = fieldName.substring(0, fieldName.length() - 4) + "_DEFAULT"; + final int len = fieldName.length() - KEY.length(); + final String defaultFieldName = fieldName.substring(0, len) + DEFAULT; b.append(" ("); try { final Field defaultField = confClass.getDeclaredField(defaultFieldName); - b.append(defaultField.getType().getSimpleName()).append(", "); - - final Object defaultValue = defaultField.get(null); - b.append("default=").append(defaultValue); + processDefault.accept(b, defaultField); } catch (NoSuchFieldException e) { - out.accept("WARNING: Default value not found for field " + f); - b.append("default not found"); + out.accept("WARNING: " + DEFAULT + " not found for field " + f); + b.append(DEFAULT).append(" not found"); } catch (IllegalAccessException e) { - out.accept("WARNING: Failed to access default value " + f); + out.accept("WARNING: Failed to access " + defaultFieldName); b.append(defaultFieldName).append(" is not public"); } b.append(")"); out.accept(b); + return true; } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4e5d0d79/ratis-common/src/main/java/org/apache/ratis/conf/RaftProperties.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/conf/RaftProperties.java b/ratis-common/src/main/java/org/apache/ratis/conf/RaftProperties.java index fc93398..ea917d1 100644 --- a/ratis-common/src/main/java/org/apache/ratis/conf/RaftProperties.java +++ b/ratis-common/src/main/java/org/apache/ratis/conf/RaftProperties.java @@ -20,7 +20,9 @@ package org.apache.ratis.conf; import com.google.common.base.Preconditions; +import org.apache.ratis.util.SizeInBytes; import org.apache.ratis.util.StringUtils; +import org.apache.ratis.util.TimeDuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.w3c.dom.*; @@ -44,6 +46,7 @@ import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.regex.PatternSyntaxException; @@ -639,6 +642,12 @@ public class RaftProperties { return Long.parseLong(valueString); } + /** @return property value; if it is not set, return the default value. */ + public SizeInBytes getSizeInBytes(String name, SizeInBytes defaultValue) { + final String valueString = getTrimmed(name); + return valueString == null? defaultValue: SizeInBytes.valueOf(valueString); + } + private String getHexDigits(String value) { boolean negative = false; String str = value; @@ -784,65 +793,14 @@ public class RaftProperties { : Enum.valueOf(defaultValue.getDeclaringClass(), val); } - enum ParsedTimeDuration { - NS { - TimeUnit unit() { return TimeUnit.NANOSECONDS; } - String suffix() { return "ns"; } - }, - US { - TimeUnit unit() { return TimeUnit.MICROSECONDS; } - String suffix() { return "us"; } - }, - MS { - TimeUnit unit() { return TimeUnit.MILLISECONDS; } - String suffix() { return "ms"; } - }, - S { - TimeUnit unit() { return TimeUnit.SECONDS; } - String suffix() { return "s"; } - }, - M { - TimeUnit unit() { return TimeUnit.MINUTES; } - String suffix() { return "m"; } - }, - H { - TimeUnit unit() { return TimeUnit.HOURS; } - String suffix() { return "h"; } - }, - D { - TimeUnit unit() { return TimeUnit.DAYS; } - String suffix() { return "d"; } - }; - abstract TimeUnit unit(); - abstract String suffix(); - static ParsedTimeDuration unitFor(String s) { - for (ParsedTimeDuration ptd : values()) { - // iteration order is in decl order, so SECONDS matched last - if (s.endsWith(ptd.suffix())) { - return ptd; - } - } - return null; - } - static ParsedTimeDuration unitFor(TimeUnit unit) { - for (ParsedTimeDuration ptd : values()) { - if (ptd.unit() == unit) { - return ptd; - } - } - return null; - } - } - /** * Set the value of <code>name</code> to the given time duration. This * is equivalent to <code>set(<name>, value + <time suffix>)</code>. * @param name Property name * @param value Time duration - * @param unit Unit of time */ - public void setTimeDuration(String name, long value, TimeUnit unit) { - set(name, value + ParsedTimeDuration.unitFor(unit).suffix()); + public void setTimeDuration(String name, TimeDuration value) { + set(name, value.toString()); } /** @@ -851,37 +809,24 @@ public class RaftProperties { * (ms), seconds (s), minutes (m), hours (h), and days (d). * @param name Property name * @param defaultValue Value returned if no mapping exists. - * @param unit Unit to convert the stored property, if it exists. * @throws NumberFormatException If the property stripped of its unit is not * a number */ - public long getTimeDuration(String name, long defaultValue, TimeUnit unit) { - String vStr = get(name); - if (null == vStr) { + public TimeDuration getTimeDuration( + String name, TimeDuration defaultValue, TimeUnit defaultUnit) { + final String value = getTrimmed(name); + if (null == value) { return defaultValue; } - vStr = vStr.trim(); - return getTimeDurationHelper(name, vStr, unit); - } - - private long getTimeDurationHelper(String name, String vStr, TimeUnit unit) { - ParsedTimeDuration vUnit = ParsedTimeDuration.unitFor(vStr); - if (null == vUnit) { - LOG.warn("No unit for " + name + "(" + vStr + ") assuming " + unit); - vUnit = ParsedTimeDuration.unitFor(unit); - } else { - vStr = vStr.substring(0, vStr.lastIndexOf(vUnit.suffix())); + try { + return TimeDuration.valueOf(value, defaultUnit); + } catch(NumberFormatException e) { + throw new IllegalArgumentException("Failed to parse " + + name + " = " + value, e); } - return unit.convert(Long.parseLong(vStr), vUnit.unit()); } - - public long[] getTimeDurations(String name, TimeUnit unit) { - String[] strings = getTrimmedStrings(name); - long[] durations = new long[strings.length]; - for (int i = 0; i < strings.length; i++) { - durations[i] = getTimeDurationHelper(name, strings[i], unit); - } - return durations; + public BiFunction<String, TimeDuration, TimeDuration> getTimeDuration(TimeUnit defaultUnit) { + return (key, defaultValue) -> getTimeDuration(key, defaultValue, defaultUnit); } /** http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4e5d0d79/ratis-common/src/main/java/org/apache/ratis/util/CheckedBiConsumer.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/CheckedBiConsumer.java b/ratis-common/src/main/java/org/apache/ratis/util/CheckedBiConsumer.java new file mode 100644 index 0000000..03256b2 --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/util/CheckedBiConsumer.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.util; + +import java.util.function.BiConsumer; + +/** {@link BiConsumer} with a throws-clause. */ +@FunctionalInterface +public interface CheckedBiConsumer<LEFT, RIGHT, THROWABLE extends Throwable> { + /** + * The same as {@link BiConsumer#accept(Object, Object)} + * except that this method is declared with a throws-clause. + */ + void accept(LEFT left, RIGHT right) throws THROWABLE; +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4e5d0d79/ratis-common/src/main/java/org/apache/ratis/util/SizeInBytes.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/SizeInBytes.java b/ratis-common/src/main/java/org/apache/ratis/util/SizeInBytes.java new file mode 100644 index 0000000..d5306c5 --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/util/SizeInBytes.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.util; + +import java.util.Objects; + +/** + * Size which may be constructed with a {@link TraditionalBinaryPrefix}. + */ +public class SizeInBytes { + public static SizeInBytes valueOf(long size) { + final String s = String.valueOf(size); + return new SizeInBytes(size, s, s); + } + + public static SizeInBytes valueOf(long n, TraditionalBinaryPrefix prefix) { + final long size = Objects.requireNonNull(prefix, "prefix = null").toLong(n); + final String input = n + " " + prefix.getSymbol(); + final String description = input + " (=" + size + ")"; + return new SizeInBytes(size, input, description); + } + + public static SizeInBytes valueOf(String input) { + input = Objects.requireNonNull(input, "input = null").trim(); + + final int last = input.length() - 1; + final String s = "b".equalsIgnoreCase(input.substring(last))? + input.substring(0, last): input; + final long size; + try { + size = TraditionalBinaryPrefix.string2long(s); + } catch(NumberFormatException e) { + throw new IllegalArgumentException("Failed to parse input " + input, e); + } + final String description = input.equals(String.valueOf(size))? + input: input + " (=" + size + ")"; + + return new SizeInBytes(size, input, description); + } + + private final long size; + private final String input; + private final String description; + + private SizeInBytes(long size, String input, String description) { + this.size = size; + this.input = input; + this.description = description; + } + + public long getSize() { + return size; + } + + public int getSizeInt() { + return Math.toIntExact(getSize()); + } + + public String getInput() { + return input; + } + + @Override + public String toString() { + return description; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4e5d0d79/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java b/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java new file mode 100644 index 0000000..d57c115 --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.util; + +import java.util.*; +import java.util.concurrent.TimeUnit; + +/** + * Time duration is represented together with a {@link TimeUnit}. + */ +public class TimeDuration implements Comparable<TimeDuration> { + + public enum Abbreviation { + NANOSECONDS("ns", "nanos"), + MICROSECONDS("us", "μs", "micros"), + MILLISECONDS("ms", "msec", "millis"), + SECONDS("s", "sec"), + MINUTES("m", "min"), + HOURS("h", "hr"), + DAYS("d"); + + private final TimeUnit unit = TimeUnit.valueOf(name()); + private final List<String> symbols; + + Abbreviation(String... symbols) { + final List<String> input = Arrays.asList(symbols); + final List<String> all = new ArrayList<>(input.size() + 2); + input.forEach(s -> all.add(s.toLowerCase())); + + final String s = unit.name().toLowerCase(); + all.add(s); + all.add(s.substring(0, s.length() - 1)); + + this.symbols = Collections.unmodifiableList(all); + } + + public TimeUnit unit() { + return unit; + } + + String getDefault() { + return symbols.get(0); + } + + public List<String> getSymbols() { + return symbols; + } + + public static Abbreviation valueOf(TimeUnit unit) { + return valueOf(unit.name()); + } + } + + public static long parse(String timeString, TimeUnit targetUnit) { + return valueOf(timeString, targetUnit).toLong(targetUnit); + } + + /** + * Parse the given time duration string. + * If there is no unit specified, use the default unit. + */ + public static TimeDuration valueOf(String timeString, TimeUnit defaultUnit) { + final String lower = Objects.requireNonNull(timeString, "timeString = null").trim(); + for(Abbreviation a : Abbreviation.values()) { + for(String s : a.getSymbols()) { + if (lower.endsWith(s)) { + final String value = lower.substring(0, lower.length()-s.length()).trim(); + try { + return valueOf(Long.parseLong(value), a.unit()); + } catch(NumberFormatException e) { + // failed with current symbol; ignore and try next symbol. + } + } + } + } + return valueOf(Long.parseLong(lower), defaultUnit); + } + + public static TimeDuration valueOf(long duration, TimeUnit unit) { + return new TimeDuration(duration, unit); + } + + private final long duration; + private final TimeUnit unit; + + private TimeDuration(long duration, TimeUnit unit) { + this.duration = duration; + this.unit = Objects.requireNonNull(unit, "unit = null"); + } + + public TimeUnit getUnit() { + return unit; + } + + public long toLong(TimeUnit targetUnit) { + return targetUnit.convert(duration, unit); + } + + public boolean isNegative() { + return duration < 0; + } + + public void sleep() throws InterruptedException { + unit.sleep(duration); + } + + @Override + public int compareTo(TimeDuration that) { + if (this.unit.compareTo(that.unit) > 0) { + return that.compareTo(this); + } + // this.unit <= that.unit + final long thisDurationInThatUnit = that.unit.convert(this.duration, this.unit); + if (thisDurationInThatUnit == that.duration) { + final long thatDurationInThisUnit = this.unit.convert(that.duration, that.unit); + return Long.compare(this.duration, thatDurationInThisUnit); + } else { + return Long.compare(thisDurationInThatUnit, that.duration); + } + } + + @Override + public String toString() { + return duration + " " + Abbreviation.valueOf(unit).getDefault(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4e5d0d79/ratis-common/src/main/java/org/apache/ratis/util/TraditionalBinaryPrefix.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/TraditionalBinaryPrefix.java b/ratis-common/src/main/java/org/apache/ratis/util/TraditionalBinaryPrefix.java new file mode 100644 index 0000000..d9677c8 --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/util/TraditionalBinaryPrefix.java @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.util; + +/** + * The traditional binary prefixes, kilo, mega, ..., exa, + * which can be represented by a 64-bit integer. + * {@link TraditionalBinaryPrefix} symbols are case insensitive. + */ +public enum TraditionalBinaryPrefix { + KILO(10), + MEGA(KILO.bitShift + 10), + GIGA(MEGA.bitShift + 10), + TERA(GIGA.bitShift + 10), + PETA(TERA.bitShift + 10), + EXA (PETA.bitShift + 10); + + private final long value; + private final char symbol; + private final int bitShift; + private final long bitMask; + + TraditionalBinaryPrefix(int bitShift) { + this.bitShift = bitShift; + this.value = 1L << bitShift; + this.bitMask = this.value - 1L; + this.symbol = toString().charAt(0); + } + + public long getValue() { + return value; + } + + public char getSymbol() { + return symbol; + } + + public long toLong(long n) { + final long shifted = n << bitShift; + if (n != shifted >>> bitShift) { + throw new ArithmeticException("Long overflow: " + toString(n) + + " cannot be assigned to a long."); + } + return shifted; + } + + public String toString(long n) { + return n + String.valueOf(symbol); + } + + /** + * @return The object corresponding to the symbol. + */ + public static TraditionalBinaryPrefix valueOf(char symbol) { + symbol = Character.toUpperCase(symbol); + for(TraditionalBinaryPrefix prefix : TraditionalBinaryPrefix.values()) { + if (symbol == prefix.symbol) { + return prefix; + } + } + throw new IllegalArgumentException("Unknown symbol '" + symbol + "'"); + } + + /** + * Convert a string to long. + * The input string is first be trimmed + * and then it is parsed with traditional binary prefix. + * + * For example, + * "-1230k" will be converted to -1230 * 1024 = -1259520; + * "891g" will be converted to 891 * 1024^3 = 956703965184; + * + * @param s input string + * @return a long value represented by the input string. + */ + public static long string2long(String s) { + s = s.trim(); + final int lastpos = s.length() - 1; + final char lastchar = s.charAt(lastpos); + if (Character.isDigit(lastchar)) + return Long.parseLong(s); + else { + long p; + try { + p = TraditionalBinaryPrefix.valueOf(lastchar).value; + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException("Invalid size prefix '" + lastchar + + "' in '" + s + "'. Allowed prefixes are k, m, g, t, p, e (case insensitive)"); + } + long num = Long.parseLong(s.substring(0, lastpos).trim()); + if (num > Long.MAX_VALUE/p || num < Long.MIN_VALUE/p) { + throw new IllegalArgumentException(s + " does not fit in a Long"); + } + return num * p; + } + } + + /** + * Convert a long integer to a string with traditional binary prefix. + * + * @param n the value to be converted + * @param unit The unit, e.g. "B" for bytes. + * @param decimalPlaces The number of decimal places. + * @return a string with traditional binary prefix. + */ + public static String long2String(long n, String unit, int decimalPlaces) { + if (unit == null) { + unit = ""; + } + //take care a special case + if (n == Long.MIN_VALUE) { + return "-8 " + EXA.symbol + unit; + } + + final StringBuilder b = new StringBuilder(); + //take care negative numbers + if (n < 0) { + b.append('-'); + n = -n; + } + if (n < KILO.value) { + //no prefix + b.append(n); + return (unit.isEmpty()? b: b.append(" ").append(unit)).toString(); + } else { + //find traditional binary prefix + int i = 0; + for(; i < values().length && n >= values()[i].value; i++); + TraditionalBinaryPrefix prefix = values()[i - 1]; + + if ((n & prefix.bitMask) == 0) { + //exact division + b.append(n >> prefix.bitShift); + } else { + final String format = "%." + decimalPlaces + "f"; + String s = StringUtils.format(format, n/(double)prefix.value); + //check a special rounding up case + if (s.startsWith("1024")) { + prefix = values()[i]; + s = StringUtils.format(format, n/(double)prefix.value); + } + b.append(s); + } + return b.append(' ').append(prefix.symbol).append(unit).toString(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4e5d0d79/ratis-common/src/test/java/org/apache/ratis/util/TestTimeDuration.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/test/java/org/apache/ratis/util/TestTimeDuration.java b/ratis-common/src/test/java/org/apache/ratis/util/TestTimeDuration.java new file mode 100644 index 0000000..a724fb3 --- /dev/null +++ b/ratis-common/src/test/java/org/apache/ratis/util/TestTimeDuration.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.util; + +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.apache.ratis.util.TimeDuration.Abbreviation; +import static org.apache.ratis.util.TimeDuration.parse; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +public class TestTimeDuration { + @Test(timeout = 10000) + public void testTimeDuration() throws Exception { + Arrays.asList(TimeUnit.values()) + .forEach(a -> assertNotNull(Abbreviation.valueOf(a.name()))); + assertEquals(TimeUnit.values().length, Abbreviation.values().length); + + final List<String> allSymbols = Arrays.asList(Abbreviation.values()).stream() + .map(Abbreviation::getSymbols) + .flatMap(List::stream) + .collect(Collectors.toList()); + Arrays.asList(TimeUnit.values()).forEach(unit -> + allSymbols.stream() + .map(s -> "0" + s) + .forEach(s -> assertEquals(s, 0L, parse(s, unit)))); + + assertEquals(1L, parse("1000000 ns", TimeUnit.MILLISECONDS)); + assertEquals(10L, parse("10000000 nanos", TimeUnit.MILLISECONDS)); + assertEquals(100L, parse("100000000 nanosecond", TimeUnit.MILLISECONDS)); + assertEquals(1000L, parse("1000000000 nanoseconds", TimeUnit.MILLISECONDS)); + + assertEquals(1L, parse("1000 us", TimeUnit.MILLISECONDS)); + assertEquals(10L, parse("10000 μs", TimeUnit.MILLISECONDS)); + assertEquals(100L, parse("100000 micros", TimeUnit.MILLISECONDS)); + assertEquals(1000L, parse("1000000 microsecond", TimeUnit.MILLISECONDS)); + assertEquals(10000L, parse("10000000 microseconds", TimeUnit.MILLISECONDS)); + + assertEquals(1L, parse("1 ms", TimeUnit.MILLISECONDS)); + assertEquals(10L, parse("10 msec", TimeUnit.MILLISECONDS)); + assertEquals(100L, parse("100 millis", TimeUnit.MILLISECONDS)); + assertEquals(1000L, parse("1000 millisecond", TimeUnit.MILLISECONDS)); + assertEquals(10000L, parse("10000 milliseconds", TimeUnit.MILLISECONDS)); + + assertEquals(1000L, parse("1 s", TimeUnit.MILLISECONDS)); + assertEquals(10000L, parse("10 sec", TimeUnit.MILLISECONDS)); + assertEquals(100000L, parse("100 second", TimeUnit.MILLISECONDS)); + assertEquals(1000000L, parse("1000 seconds", TimeUnit.MILLISECONDS)); + + assertEquals(60, parse("1 m", TimeUnit.SECONDS)); + assertEquals(600, parse("10 min", TimeUnit.SECONDS)); + assertEquals(6000, parse("100 minutes", TimeUnit.SECONDS)); + assertEquals(60000, parse("1000 minutes", TimeUnit.SECONDS)); + + assertEquals(60, parse("1 h", TimeUnit.MINUTES)); + assertEquals(600, parse("10 hr", TimeUnit.MINUTES)); + assertEquals(6000, parse("100 hour", TimeUnit.MINUTES)); + assertEquals(60000, parse("1000 hours", TimeUnit.MINUTES)); + + assertEquals(24, parse("1 d", TimeUnit.HOURS)); + assertEquals(240, parse("10 day", TimeUnit.HOURS)); + assertEquals(2400, parse("100 days", TimeUnit.HOURS)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4e5d0d79/ratis-common/src/test/java/org/apache/ratis/util/TestTraditionalBinaryPrefix.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/test/java/org/apache/ratis/util/TestTraditionalBinaryPrefix.java b/ratis-common/src/test/java/org/apache/ratis/util/TestTraditionalBinaryPrefix.java new file mode 100644 index 0000000..ed9aadb --- /dev/null +++ b/ratis-common/src/test/java/org/apache/ratis/util/TestTraditionalBinaryPrefix.java @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.util; + +import org.junit.Test; + +import static org.apache.ratis.util.TraditionalBinaryPrefix.long2String; +import static org.apache.ratis.util.TraditionalBinaryPrefix.string2long; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public class TestTraditionalBinaryPrefix { + @Test(timeout = 10000) + public void testTraditionalBinaryPrefix() throws Exception { + //test string2long(..) + String[] symbol = {"k", "m", "g", "t", "p", "e"}; + long m = 1024; + for(String s : symbol) { + assertEquals(0, string2long(0 + s)); + assertEquals(m, string2long(1 + s)); + m *= 1024; + } + + assertEquals(0L, string2long("0")); + assertEquals(1024L, string2long("1k")); + assertEquals(-1024L, string2long("-1k")); + assertEquals(1259520L, string2long("1230K")); + assertEquals(-1259520L, string2long("-1230K")); + assertEquals(104857600L, string2long("100m")); + assertEquals(-104857600L, string2long("-100M")); + assertEquals(956703965184L, string2long("891g")); + assertEquals(-956703965184L, string2long("-891G")); + assertEquals(501377302265856L, string2long("456t")); + assertEquals(-501377302265856L, string2long("-456T")); + assertEquals(11258999068426240L, string2long("10p")); + assertEquals(-11258999068426240L, string2long("-10P")); + assertEquals(1152921504606846976L, string2long("1e")); + assertEquals(-1152921504606846976L, string2long("-1E")); + + String tooLargeNumStr = "10e"; + try { + string2long(tooLargeNumStr); + fail("Test passed for a number " + tooLargeNumStr + " too large"); + } catch (IllegalArgumentException e) { + assertEquals(tooLargeNumStr + " does not fit in a Long", e.getMessage()); + } + + String tooSmallNumStr = "-10e"; + try { + string2long(tooSmallNumStr); + fail("Test passed for a number " + tooSmallNumStr + " too small"); + } catch (IllegalArgumentException e) { + assertEquals(tooSmallNumStr + " does not fit in a Long", e.getMessage()); + } + + String invalidFormatNumStr = "10kb"; + char invalidPrefix = 'b'; + try { + string2long(invalidFormatNumStr); + fail("Test passed for a number " + invalidFormatNumStr + + " has invalid format"); + } catch (IllegalArgumentException e) { + assertEquals("Invalid size prefix '" + invalidPrefix + "' in '" + + invalidFormatNumStr + + "'. Allowed prefixes are k, m, g, t, p, e(case insensitive)", + e.getMessage()); + } + + //test long2string(..) + assertEquals("0", long2String(0, null, 2)); + for(int decimalPlace = 0; decimalPlace < 2; decimalPlace++) { + for(int n = 1; n < TraditionalBinaryPrefix.KILO.getValue(); n++) { + assertEquals(n + "", long2String(n, null, decimalPlace)); + assertEquals(-n + "", long2String(-n, null, decimalPlace)); + } + assertEquals("1 K", long2String(1L << 10, null, decimalPlace)); + assertEquals("-1 K", long2String(-1L << 10, null, decimalPlace)); + } + + assertEquals("8.00 E", long2String(Long.MAX_VALUE, null, 2)); + assertEquals("8.00 E", long2String(Long.MAX_VALUE - 1, null, 2)); + assertEquals("-8 E", long2String(Long.MIN_VALUE, null, 2)); + assertEquals("-8.00 E", long2String(Long.MIN_VALUE + 1, null, 2)); + + final String[] zeros = {" ", ".0 ", ".00 "}; + for(int decimalPlace = 0; decimalPlace < zeros.length; decimalPlace++) { + final String trailingZeros = zeros[decimalPlace]; + + for(int e = 11; e < Long.SIZE - 1; e++) { + final TraditionalBinaryPrefix p + = TraditionalBinaryPrefix.values()[e/10 - 1]; + + { // n = 2^e + final long n = 1L << e; + final String expected = (n/p.getValue()) + " " + p.getSymbol(); + assertEquals("n=" + n, expected, long2String(n, null, 2)); + } + + { // n = 2^e + 1 + final long n = (1L << e) + 1; + final String expected = (n/p.getValue()) + trailingZeros + p.getSymbol(); + assertEquals("n=" + n, expected, long2String(n, null, decimalPlace)); + } + + { // n = 2^e - 1 + final long n = (1L << e) - 1; + final String expected = ((n+1)/p.getValue()) + trailingZeros + p.getSymbol(); + assertEquals("n=" + n, expected, long2String(n, null, decimalPlace)); + } + } + } + + assertEquals("1.50 K", long2String(3L << 9, null, 2)); + assertEquals("1.5 K", long2String(3L << 9, null, 1)); + assertEquals("1.50 M", long2String(3L << 19, null, 2)); + assertEquals("2 M", long2String(3L << 19, null, 0)); + assertEquals("3 G", long2String(3L << 30, null, 2)); + + assertEquals("0 B", byteDescription(0)); + assertEquals("-100 B", byteDescription(-100)); + assertEquals("1 KB", byteDescription(1024)); + assertEquals("1.50 KB", byteDescription(3L << 9)); + assertEquals("1.50 MB", byteDescription(3L << 19)); + assertEquals("3 GB", byteDescription(3L << 30)); + } + + private static String byteDescription(long len) { + return long2String(len, "B", 2); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4e5d0d79/ratis-grpc/pom.xml ---------------------------------------------------------------------- diff --git a/ratis-grpc/pom.xml b/ratis-grpc/pom.xml index 6ffad7c..99af552 100644 --- a/ratis-grpc/pom.xml +++ b/ratis-grpc/pom.xml @@ -37,7 +37,6 @@ <dependency> <artifactId>ratis-common</artifactId> <groupId>org.apache.ratis</groupId> - <scope>provided</scope> </dependency> <dependency> <artifactId>ratis-common</artifactId> @@ -49,7 +48,6 @@ <dependency> <artifactId>ratis-client</artifactId> <groupId>org.apache.ratis</groupId> - <scope>provided</scope> </dependency> <dependency> <artifactId>ratis-client</artifactId> http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4e5d0d79/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java new file mode 100644 index 0000000..430fd17 --- /dev/null +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc; + +import org.apache.ratis.client.RaftClientConfigKeys; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.util.SizeInBytes; +import org.apache.ratis.util.TimeDuration; + +import static org.apache.ratis.conf.ConfUtils.*; + +public interface GrpcConfigKeys { + String PREFIX = "raft.grpc"; + + interface Server { + String PREFIX = GrpcConfigKeys.PREFIX + ".server"; + + String PORT_KEY = PREFIX + ".port"; + int PORT_DEFAULT = 0; + static int port(RaftProperties properties) { + return getInt(properties::getInt, + PORT_KEY, PORT_DEFAULT, requireMin(0), requireMax(65536)); + } + static void setPort(RaftProperties properties, int port) { + setInt(properties::setInt, PORT_KEY, port); + } + + String MESSAGE_SIZE_MAX_KEY = PREFIX + ".message.size.max"; + SizeInBytes MESSAGE_SIZE_MAX_DEFAULT = SizeInBytes.valueOf("64MB"); + static SizeInBytes messageSizeMax(RaftProperties properties) { + return getSizeInBytes(properties::getSizeInBytes, + MESSAGE_SIZE_MAX_KEY, MESSAGE_SIZE_MAX_DEFAULT); + } + + String LEADER_OUTSTANDING_APPENDS_MAX_KEY = PREFIX + ".leader.outstanding.appends.max"; + int LEADER_OUTSTANDING_APPENDS_MAX_DEFAULT = 128; + static int leaderOutstandingAppendsMax(RaftProperties properties) { + return getInt(properties::getInt, + LEADER_OUTSTANDING_APPENDS_MAX_KEY, LEADER_OUTSTANDING_APPENDS_MAX_DEFAULT, requireMin(0)); + } + } + + interface OutputStream { + String PREFIX = GrpcConfigKeys.PREFIX + ".outputstream"; + + String BUFFER_SIZE_KEY = PREFIX + ".buffer.size"; + SizeInBytes BUFFER_SIZE_DEFAULT = SizeInBytes.valueOf("64KB"); + static SizeInBytes bufferSize(RaftProperties properties) { + return getSizeInBytes(properties::getSizeInBytes, + BUFFER_SIZE_KEY, BUFFER_SIZE_DEFAULT); + } + static void setBufferSize(RaftProperties properties, SizeInBytes bufferSize) { + setSizeInBytesInt(properties::set, BUFFER_SIZE_KEY, bufferSize); + } + + String RETRY_TIMES_KEY = PREFIX + ".retry.times"; + int RETRY_TIMES_DEFAULT = 5; + static int retryTimes(RaftProperties properties) { + return getInt(properties::getInt, + RETRY_TIMES_KEY, RETRY_TIMES_DEFAULT, requireMin(1)); + } + + String RETRY_INTERVAL_KEY = PREFIX + ".retry.interval"; + TimeDuration RETRY_INTERVAL_DEFAULT = RaftClientConfigKeys.Rpc.TIMEOUT_DEFAULT; + static TimeDuration retryInterval(RaftProperties properties) { + return getTimeDuration(properties.getTimeDuration(RETRY_INTERVAL_DEFAULT.getUnit()), + RETRY_INTERVAL_KEY, RETRY_INTERVAL_DEFAULT); + } + + String OUTSTANDING_APPENDS_MAX_KEY = PREFIX + ".outstanding.appends.max"; + int OUTSTANDING_APPENDS_MAX_DEFAULT = 128; + static int outstandingAppendsMax(RaftProperties properties) { + return getInt(properties::getInt, + OUTSTANDING_APPENDS_MAX_KEY, OUTSTANDING_APPENDS_MAX_DEFAULT, requireMin(0)); + } + } + + static void main(String[] args) { + printAll(GrpcConfigKeys.class); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4e5d0d79/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java index 875efbb..3185a37 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java @@ -41,8 +41,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; -import static org.apache.ratis.grpc.RaftGrpcConfigKeys.*; - /** A grpc implementation of {@link RaftServerRpc}. */ public class RaftGRpcService implements RaftServerRpc { static final Logger LOG = LoggerFactory.getLogger(RaftGRpcService.class); @@ -75,10 +73,8 @@ public class RaftGRpcService implements RaftServerRpc { private RaftGRpcService(RaftServer server) { this(server, - server.getProperties().getInt(RAFT_GRPC_SERVER_PORT_KEY, - RAFT_GRPC_SERVER_PORT_DEFAULT), - server.getProperties().getInt(RAFT_GRPC_MESSAGE_MAXSIZE_KEY, - RAFT_GRPC_MESSAGE_MAXSIZE_DEFAULT)); + GrpcConfigKeys.Server.port(server.getProperties()), + GrpcConfigKeys.Server.messageSizeMax(server.getProperties()).getSizeInt()); } private RaftGRpcService(RaftServer raftServer, int port, int maxMessageSize) { ServerBuilder serverBuilder = ServerBuilder.forPort(port); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4e5d0d79/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcConfigKeys.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcConfigKeys.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcConfigKeys.java deleted file mode 100644 index ffec8ff..0000000 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcConfigKeys.java +++ /dev/null @@ -1,47 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ratis.grpc; - -import org.apache.ratis.client.RaftClientConfigKeys; - -public interface RaftGrpcConfigKeys { - String PREFIX = "raft.grpc"; - - String RAFT_GRPC_SERVER_PORT_KEY = PREFIX + ".server.port"; - int RAFT_GRPC_SERVER_PORT_DEFAULT = 0; - - String RAFT_GRPC_MESSAGE_MAXSIZE_KEY = PREFIX + ".message.maxsize"; - int RAFT_GRPC_MESSAGE_MAXSIZE_DEFAULT = 64 * 1024 * 1024; // 64 MB - - String RAFT_GRPC_LEADER_MAX_OUTSTANDING_APPENDS_KEY = - PREFIX + "leader.max.outstanding.appends"; - int RAFT_GRPC_LEADER_MAX_OUTSTANDING_APPENDS_DEFAULT = 128; - - String RAFT_GRPC_CLIENT_MAX_OUTSTANDING_APPENDS_KEY = - PREFIX + "client.max.outstanding.appends"; - int RAFT_GRPC_CLIENT_MAX_OUTSTANDING_APPENDS_DEFAULT = 128; - - String RAFT_OUTPUTSTREAM_BUFFER_SIZE_KEY = "raft.outputstream.buffer.size"; - int RAFT_OUTPUTSTREAM_BUFFER_SIZE_DEFAULT = 64 * 1024; - - String RAFT_OUTPUTSTREAM_MAX_RETRY_TIMES_KEY = "raft.outputstream.max.retry.times"; - int RAFT_OUTPUTSTREAM_MAX_RETRY_TIMES_DEFAULT = 5; - - String RAFT_OUTPUTSTREAM_RETRY_INTERVAL_KEY = "raft.outputstream.retry.interval"; - long RAFT_OUTPUTSTREAM_RETRY_INTERVAL_DEFAULT = RaftClientConfigKeys.RAFT_RPC_TIMEOUT_MS_DEFAULT; -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4e5d0d79/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java index 6d7b207..553ec9a 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java @@ -18,19 +18,15 @@ package org.apache.ratis.grpc.client; import com.google.common.base.Preconditions; - -import org.apache.ratis.protocol.ClientId; -import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.util.TimeDuration; +import org.apache.ratis.grpc.GrpcConfigKeys; +import org.apache.ratis.grpc.RaftGrpcUtil; +import org.apache.ratis.protocol.*; import org.apache.ratis.shaded.com.google.protobuf.ByteString; import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto; import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto; import org.apache.ratis.shaded.proto.RaftProtos.RaftRpcRequestProto; -import org.apache.ratis.conf.RaftProperties; -import org.apache.ratis.grpc.RaftGrpcConfigKeys; -import org.apache.ratis.grpc.RaftGrpcUtil; -import org.apache.ratis.protocol.NotLeaderException; -import org.apache.ratis.protocol.RaftClientReply; -import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.util.Daemon; import org.apache.ratis.util.PeerProxyMap; import org.apache.ratis.util.RaftUtils; @@ -41,14 +37,11 @@ import java.io.Closeable; import java.io.IOException; import java.util.*; import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.stream.Collectors; import static org.apache.ratis.client.impl.ClientProtoUtils.*; -import static org.apache.ratis.grpc.RaftGrpcConfigKeys.RAFT_GRPC_CLIENT_MAX_OUTSTANDING_APPENDS_DEFAULT; -import static org.apache.ratis.grpc.RaftGrpcConfigKeys.RAFT_GRPC_CLIENT_MAX_OUTSTANDING_APPENDS_KEY; public class AppendStreamer implements Closeable { public static final Logger LOG = LoggerFactory.getLogger(AppendStreamer.class); @@ -59,16 +52,11 @@ public class AppendStreamer implements Closeable { private final Map<RaftPeerId, IOException> exceptionMap = new HashMap<>(); private final AtomicInteger retryTimes = new AtomicInteger(0); private final int maxRetryTimes; - private final long retryInterval; + private final TimeDuration retryInterval; ExceptionAndRetry(RaftProperties prop) { - maxRetryTimes = prop.getInt( - RaftGrpcConfigKeys.RAFT_OUTPUTSTREAM_MAX_RETRY_TIMES_KEY, - RaftGrpcConfigKeys.RAFT_OUTPUTSTREAM_MAX_RETRY_TIMES_DEFAULT); - retryInterval = prop.getTimeDuration( - RaftGrpcConfigKeys.RAFT_OUTPUTSTREAM_RETRY_INTERVAL_KEY, - RaftGrpcConfigKeys.RAFT_OUTPUTSTREAM_RETRY_INTERVAL_DEFAULT, - TimeUnit.MILLISECONDS); + maxRetryTimes = GrpcConfigKeys.OutputStream.retryTimes(prop); + retryInterval = GrpcConfigKeys.OutputStream.retryInterval(prop); } void addException(RaftPeerId peer, IOException e) { @@ -102,9 +90,7 @@ public class AppendStreamer implements Closeable { AppendStreamer(RaftProperties prop, Collection<RaftPeer> peers, RaftPeerId leaderId, ClientId clientId) { this.clientId = clientId; - maxPendingNum = prop.getInt( - RAFT_GRPC_CLIENT_MAX_OUTSTANDING_APPENDS_KEY, - RAFT_GRPC_CLIENT_MAX_OUTSTANDING_APPENDS_DEFAULT); + maxPendingNum = GrpcConfigKeys.OutputStream.outstandingAppendsMax(prop); dataQueue = new ConcurrentLinkedDeque<>(); ackQueue = new ConcurrentLinkedDeque<>(); exceptionAndRetry = new ExceptionAndRetry(prop); @@ -359,7 +345,7 @@ public class AppendStreamer implements Closeable { dataQueue.poll()); ackQueue.offer(request); try { - Thread.sleep(exceptionAndRetry.retryInterval); + exceptionAndRetry.retryInterval.sleep(); } catch (InterruptedException ignored) { } leaderProxy.onNext(request); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4e5d0d79/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftOutputStream.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftOutputStream.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftOutputStream.java index 73e56b8..33d3d22 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftOutputStream.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftOutputStream.java @@ -17,20 +17,18 @@ */ package org.apache.ratis.grpc.client; -import static org.apache.ratis.grpc.RaftGrpcConfigKeys.RAFT_OUTPUTSTREAM_BUFFER_SIZE_DEFAULT; -import static org.apache.ratis.grpc.RaftGrpcConfigKeys.RAFT_OUTPUTSTREAM_BUFFER_SIZE_KEY; - -import java.io.IOException; -import java.io.OutputStream; -import java.util.Collection; -import java.util.concurrent.atomic.AtomicLong; - import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.grpc.GrpcConfigKeys; import org.apache.ratis.protocol.ClientId; import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.util.ProtoUtils; +import java.io.IOException; +import java.io.OutputStream; +import java.util.Collection; +import java.util.concurrent.atomic.AtomicLong; + public class RaftOutputStream extends OutputStream { /** internal buffer */ private final byte buf[]; @@ -43,8 +41,7 @@ public class RaftOutputStream extends OutputStream { public RaftOutputStream(RaftProperties prop, ClientId clientId, Collection<RaftPeer> peers, RaftPeerId leaderId) { - final int bufferSize = prop.getInt(RAFT_OUTPUTSTREAM_BUFFER_SIZE_KEY, - RAFT_OUTPUTSTREAM_BUFFER_SIZE_DEFAULT); + final int bufferSize = GrpcConfigKeys.OutputStream.bufferSize(prop).getSizeInt(); buf = new byte[bufferSize]; count = 0; this.clientId = clientId; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4e5d0d79/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java index 5f1d901..91dc02c 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java @@ -26,7 +26,7 @@ import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesRequestProto; import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotReplyProto; import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotRequestProto; import org.apache.ratis.grpc.RaftGRpcService; -import org.apache.ratis.grpc.RaftGrpcConfigKeys; +import org.apache.ratis.grpc.GrpcConfigKeys; import org.apache.ratis.server.impl.FollowerInfo; import org.apache.ratis.server.impl.LeaderState; import org.apache.ratis.server.impl.LogAppender; @@ -63,9 +63,7 @@ public class GRpcLogAppender extends LogAppender { RaftGRpcService rpcService = (RaftGRpcService) server.getServerRpc(); client = rpcService.getRpcClient(f.getPeer()); - maxPendingRequestsNum = server.getProperties().getInt( - RaftGrpcConfigKeys.RAFT_GRPC_LEADER_MAX_OUTSTANDING_APPENDS_KEY, - RaftGrpcConfigKeys.RAFT_GRPC_LEADER_MAX_OUTSTANDING_APPENDS_DEFAULT); + maxPendingRequestsNum = GrpcConfigKeys.Server.leaderOutstandingAppendsMax(server.getProperties()); pendingRequests = new ConcurrentLinkedQueue<>(); appendResponseHandler = new AppendLogResponseHandler(); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4e5d0d79/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java index 9676a48..c3ca707 100644 --- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java +++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java @@ -34,7 +34,7 @@ public class MiniRaftClusterWithGRpc extends MiniRaftCluster.RpcBase { @Override public MiniRaftClusterWithGRpc newCluster( String[] ids, RaftProperties prop) { - RaftConfigKeys.Rpc.setType(prop::set, SupportedRpcType.GRPC); + RaftConfigKeys.Rpc.setType(prop, SupportedRpcType.GRPC); return new MiniRaftClusterWithGRpc(ids, prop); } }; @@ -50,7 +50,7 @@ public class MiniRaftClusterWithGRpc extends MiniRaftCluster.RpcBase { protected RaftServerImpl newRaftServer( RaftPeerId id, StateMachine stateMachine, RaftConfiguration conf, RaftProperties properties) throws IOException { - properties.setInt(RaftGrpcConfigKeys.RAFT_GRPC_SERVER_PORT_KEY, getPort(id, conf)); + GrpcConfigKeys.Server.setPort(properties, getPort(id, conf)); return ServerImplUtils.newRaftServer(id, stateMachine, conf, properties, null); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4e5d0d79/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java index d7cb75a..4bae5a9 100644 --- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java +++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java @@ -18,11 +18,12 @@ package org.apache.ratis.grpc; import org.apache.log4j.Level; -import org.apache.ratis.protocol.ClientId; import org.apache.ratis.RaftTestUtil; import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.util.SizeInBytes; import org.apache.ratis.grpc.client.AppendStreamer; import org.apache.ratis.grpc.client.RaftOutputStream; +import org.apache.ratis.protocol.ClientId; import org.apache.ratis.server.impl.RaftServerImpl; import org.apache.ratis.server.storage.RaftLog; import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; @@ -42,7 +43,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import static org.apache.ratis.RaftTestUtil.waitForLeader; -import static org.apache.ratis.grpc.RaftGrpcConfigKeys.RAFT_OUTPUTSTREAM_BUFFER_SIZE_KEY; import static org.junit.Assert.fail; public class TestRaftStream { @@ -81,7 +81,7 @@ public class TestRaftStream { LOG.info("Running testSimpleWrite"); // default 64K is too large for a test - prop.setInt(RAFT_OUTPUTSTREAM_BUFFER_SIZE_KEY, 4); + GrpcConfigKeys.OutputStream.setBufferSize(prop, SizeInBytes.valueOf(4)); cluster = MiniRaftClusterWithGRpc.FACTORY.newCluster(NUM_SERVERS, prop); cluster.start(); @@ -122,7 +122,7 @@ public class TestRaftStream { public void testWriteAndFlush() throws Exception { LOG.info("Running testWriteAndFlush"); - prop.setInt(RAFT_OUTPUTSTREAM_BUFFER_SIZE_KEY, ByteValue.BUFFERSIZE); + GrpcConfigKeys.OutputStream.setBufferSize(prop, SizeInBytes.valueOf(ByteValue.BUFFERSIZE)); cluster = MiniRaftClusterWithGRpc.FACTORY.newCluster(NUM_SERVERS, prop); cluster.start(); @@ -200,7 +200,7 @@ public class TestRaftStream { @Test public void testWriteWithOffset() throws Exception { LOG.info("Running testWriteWithOffset"); - prop.setInt(RAFT_OUTPUTSTREAM_BUFFER_SIZE_KEY, ByteValue.BUFFERSIZE); + GrpcConfigKeys.OutputStream.setBufferSize(prop, SizeInBytes.valueOf(ByteValue.BUFFERSIZE)); cluster = MiniRaftClusterWithGRpc.FACTORY.newCluster(NUM_SERVERS, prop); cluster.start(); @@ -259,7 +259,7 @@ public class TestRaftStream { public void testKillLeader() throws Exception { LOG.info("Running testChangeLeader"); - prop.setInt(RAFT_OUTPUTSTREAM_BUFFER_SIZE_KEY, 4); + GrpcConfigKeys.OutputStream.setBufferSize(prop, SizeInBytes.valueOf(4)); cluster = MiniRaftClusterWithGRpc.FACTORY.newCluster(NUM_SERVERS, prop); cluster.start(); final RaftServerImpl leader = waitForLeader(cluster); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4e5d0d79/ratis-hadoop/pom.xml ---------------------------------------------------------------------- diff --git a/ratis-hadoop/pom.xml b/ratis-hadoop/pom.xml index f3be6a6..98ea84f 100644 --- a/ratis-hadoop/pom.xml +++ b/ratis-hadoop/pom.xml @@ -41,7 +41,6 @@ <dependency> <artifactId>ratis-common</artifactId> <groupId>org.apache.ratis</groupId> - <scope>provided</scope> </dependency> <dependency> <artifactId>ratis-common</artifactId> http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4e5d0d79/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopConfigKeys.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopConfigKeys.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopConfigKeys.java index f65ee5d..5d66480 100644 --- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopConfigKeys.java +++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopConfigKeys.java @@ -18,28 +18,25 @@ package org.apache.ratis.hadooprpc; import org.apache.hadoop.conf.Configuration; -import org.apache.ratis.conf.ConfUtils; import org.apache.ratis.conf.Parameters; import java.net.InetSocketAddress; -import java.util.function.BiConsumer; -import java.util.function.BiFunction; -import static org.apache.ratis.conf.ConfUtils.requireMin; +import static org.apache.ratis.conf.ConfUtils.*; /** Hadoop Rpc specific configuration properties. */ public interface HadoopConfigKeys { - String PREFIX = "raft.hadooprpc"; + String PREFIX = "raft.hadoop"; - String CONF_KEY = PREFIX + ".conf"; + String CONF_PARAMETER = PREFIX + ".conf"; + Class<Configuration> CONF_CLASS = Configuration.class; - static Configuration getConf( - BiFunction<String, Class<Configuration>, Configuration> getConf) { - return getConf.apply(CONF_KEY, Configuration.class); + static Configuration getConf(Parameters parameters) { + return parameters.get(CONF_PARAMETER, CONF_CLASS); } static void setConf(Parameters parameters, Configuration conf) { - parameters.put(CONF_KEY, conf, Configuration.class); + parameters.put(CONF_PARAMETER, conf, Configuration.class); } /** IPC server configurations */ @@ -47,24 +44,28 @@ public interface HadoopConfigKeys { String PREFIX = HadoopConfigKeys.PREFIX + ".ipc"; String ADDRESS_KEY = PREFIX + ".address"; - int DEFAULT_PORT = 10718; - String ADDRESS_DEFAULT = "0.0.0.0:" + DEFAULT_PORT; + int PORT_DEFAULT = 10718; + String ADDRESS_DEFAULT = "0.0.0.0:" + PORT_DEFAULT; String HANDLERS_KEY = PREFIX + ".handlers"; int HANDLERS_DEFAULT = 10; - static int handlers(BiFunction<String, Integer, Integer> getInt) { - return ConfUtils.getInt(getInt, + static int handlers(Configuration conf) { + return getInt(conf::getInt, HANDLERS_KEY, HANDLERS_DEFAULT, requireMin(1)); } - static InetSocketAddress address(BiFunction<String, String, String> getTrimmed) { - return ConfUtils.getInetSocketAddress(getTrimmed, + static InetSocketAddress address(Configuration conf) { + return getInetSocketAddress(conf::getTrimmed, ADDRESS_KEY, ADDRESS_DEFAULT); } - static void setAddress(BiConsumer<String, String> setString, String address) { - ConfUtils.set(setString, ADDRESS_KEY, address); + static void setAddress(Configuration conf, String address) { + set(conf::set, ADDRESS_KEY, address); } } + + static void main(String[] args) { + printAll(HadoopConfigKeys.class); + } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4e5d0d79/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java index ad866c8..419bfaa 100644 --- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java +++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java @@ -36,7 +36,7 @@ public class HadoopFactory extends ServerFactory.BaseFactory implements ClientFa private final Configuration conf; public HadoopFactory(Parameters parameters) { - this(HadoopConfigKeys.getConf(parameters::get)); + this(HadoopConfigKeys.getConf(parameters)); } public HadoopFactory(Configuration conf) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4e5d0d79/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java index 00d69aa..ae8bf37 100644 --- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java +++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java @@ -119,8 +119,8 @@ public class HadoopRpcService implements RaftServerRpc { private static RPC.Server newRpcServer( RaftServerProtocol serverProtocol, final Configuration conf) throws IOException { - final int handlerCount = HadoopConfigKeys.Ipc.handlers(conf::getInt); - final InetSocketAddress address = HadoopConfigKeys.Ipc.address(conf::getTrimmed); + final int handlerCount = HadoopConfigKeys.Ipc.handlers(conf); + final InetSocketAddress address = HadoopConfigKeys.Ipc.address(conf); final BlockingService service = RaftServerProtocolService.newReflectiveBlockingService( http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4e5d0d79/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java index b3a607b..7c11f8e 100644 --- a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java +++ b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java @@ -52,8 +52,8 @@ public class MiniRaftClusterWithHadoopRpc extends MiniRaftCluster.RpcBase { public MiniRaftClusterWithHadoopRpc newCluster( String[] ids, RaftProperties prop, Configuration conf) { - RaftConfigKeys.Rpc.setType(prop::set, SupportedRpcType.HADOOP); - HadoopConfigKeys.Ipc.setAddress(conf::set, "0.0.0.0:0"); + RaftConfigKeys.Rpc.setType(prop, SupportedRpcType.HADOOP); + HadoopConfigKeys.Ipc.setAddress(conf, "0.0.0.0:0"); return new MiniRaftClusterWithHadoopRpc(ids, prop, conf); } } @@ -77,7 +77,7 @@ public class MiniRaftClusterWithHadoopRpc extends MiniRaftCluster.RpcBase { RaftProperties properties) throws IOException { final Configuration hconf = new Configuration(hadoopConf); final String address = "0.0.0.0:" + getPort(id, conf); - HadoopConfigKeys.Ipc.setAddress(hconf::set, address); + HadoopConfigKeys.Ipc.setAddress(hconf, address); return ServerImplUtils.newRaftServer(id, stateMachine, conf, properties, HadoopFactory.newRaftParameters(hconf)); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4e5d0d79/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java ---------------------------------------------------------------------- diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java index fd2703d..aa0d9f6 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java @@ -17,13 +17,9 @@ */ package org.apache.ratis.netty; -import org.apache.ratis.conf.ConfUtils; +import org.apache.ratis.conf.RaftProperties; -import java.util.function.BiConsumer; -import java.util.function.BiFunction; - -import static org.apache.ratis.conf.ConfUtils.requireMax; -import static org.apache.ratis.conf.ConfUtils.requireMin; +import static org.apache.ratis.conf.ConfUtils.*; public interface NettyConfigKeys { String PREFIX = "raft.netty"; @@ -34,17 +30,17 @@ public interface NettyConfigKeys { String PORT_KEY = PREFIX + ".port"; int PORT_DEFAULT = 0; - static int port(BiFunction<String, Integer, Integer> getInt) { - return ConfUtils.getInt(getInt, + static int port(RaftProperties properties) { + return getInt(properties::getInt, PORT_KEY, PORT_DEFAULT, requireMin(0), requireMax(65536)); } - static void setPort(BiConsumer<String, Integer> setInt, int port) { - ConfUtils.setInt(setInt, PORT_KEY, port); + static void setPort(RaftProperties properties, int port) { + setInt(properties::setInt, PORT_KEY, port); } } static void main(String[] args) { - ConfUtils.printAll(NettyConfigKeys.class); + printAll(NettyConfigKeys.class); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4e5d0d79/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java ---------------------------------------------------------------------- diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java index 2d927db..7487219 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java @@ -115,7 +115,7 @@ public final class NettyRpcService implements RaftServerRpc { } }; - final int port = NettyConfigKeys.Server.port(server.getProperties()::getInt); + final int port = NettyConfigKeys.Server.port(server.getProperties()); channelFuture = new ServerBootstrap() .group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4e5d0d79/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java ---------------------------------------------------------------------- diff --git a/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java b/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java index 697aef6..126624d 100644 --- a/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java +++ b/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java @@ -37,7 +37,7 @@ public class MiniRaftClusterWithNetty extends MiniRaftCluster.RpcBase { = new Factory<MiniRaftClusterWithNetty>() { @Override public MiniRaftClusterWithNetty newCluster(String[] ids, RaftProperties prop) { - RaftConfigKeys.Rpc.setType(prop::set, SupportedRpcType.NETTY); + RaftConfigKeys.Rpc.setType(prop, SupportedRpcType.NETTY); return new MiniRaftClusterWithNetty(ids, prop); } }; @@ -53,7 +53,7 @@ public class MiniRaftClusterWithNetty extends MiniRaftCluster.RpcBase { protected RaftServerImpl newRaftServer( RaftPeerId id, StateMachine stateMachine, RaftConfiguration conf, RaftProperties properties) throws IOException { - NettyConfigKeys.Server.setPort(properties::setInt, getPort(id, conf)); + NettyConfigKeys.Server.setPort(properties, getPort(id, conf)); return ServerImplUtils.newRaftServer(id, stateMachine, conf, properties, null); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4e5d0d79/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java index 718c277..a307101 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java @@ -42,7 +42,7 @@ public class MiniRaftClusterWithSimulatedRpc extends MiniRaftCluster { @Override public MiniRaftClusterWithSimulatedRpc newCluster( String[] ids, RaftProperties prop) { - RaftConfigKeys.Rpc.setType(prop::set, SimulatedRpc.INSTANCE); + RaftConfigKeys.Rpc.setType(prop, SimulatedRpc.INSTANCE); if (ThreadLocalRandom.current().nextBoolean()) { // turn off simulate latency half of the times. prop.setInt(SimulatedRequestReply.SIMULATE_LATENCY_KEY, 0);
