This is an automated email from the ASF dual-hosted git repository. dragonyliu pushed a commit to branch branch-2 in repository https://gitbox.apache.org/repos/asf/ratis.git
commit 4332b0fe20234915765b5c16ed13883d6d651370 Author: Yaolong Liu <[email protected]> AuthorDate: Wed Sep 21 19:13:03 2022 +0800 Revert "RATIS-1677. Do not auto format RaftStorage in RECOVER. (#718)" This reverts commit 7a4543b1cedfb6b9b391ed6c6094b88fecacd546. --- .../ratis/client/api/GroupManagementApi.java | 12 +- .../apache/ratis/client/impl/ClientProtoUtils.java | 9 +- .../ratis/client/impl/GroupManagementImpl.java | 4 +- .../ratis/protocol/GroupManagementRequest.java | 13 +- .../apache/ratis/util/MemoizedCheckedSupplier.java | 90 ------------ .../java/org/apache/ratis/util/Preconditions.java | 5 - .../java/org/apache/ratis/util/SizeInBytes.java | 1 - ratis-proto/src/main/proto/Raft.proto | 1 - .../java/org/apache/ratis/server/RaftServer.java | 16 +-- .../apache/ratis/server/storage/RaftStorage.java | 5 +- .../apache/ratis/server/impl/RaftServerImpl.java | 9 +- .../apache/ratis/server/impl/RaftServerProxy.java | 26 ++-- .../apache/ratis/server/impl/ServerImplUtils.java | 5 +- .../org/apache/ratis/server/impl/ServerState.java | 128 +++++++++++------ .../server/storage/RaftStorageDirectoryImpl.java | 7 +- .../ratis/server/storage/RaftStorageImpl.java | 10 +- .../ratis/server/storage/SnapshotManager.java | 12 +- .../ratis/server/storage/StorageImplUtils.java | 160 +++------------------ .../apache/ratis/server/impl/MiniRaftCluster.java | 6 +- .../server/impl/RaftReconfigurationBaseTest.java | 4 +- .../TestServerState.java} | 27 ++-- .../ratis/server/storage/TestRaftStorage.java | 6 +- 22 files changed, 174 insertions(+), 382 deletions(-) diff --git a/ratis-client/src/main/java/org/apache/ratis/client/api/GroupManagementApi.java b/ratis-client/src/main/java/org/apache/ratis/client/api/GroupManagementApi.java index 558747048..1d3bc00b1 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/api/GroupManagementApi.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/api/GroupManagementApi.java @@ -29,16 +29,8 @@ import java.io.IOException; * APIs to support group management operations such as add, remove, list and info to a particular server. */ public interface GroupManagementApi { - /** - * Add a new group. - * @param format Should it format the storage? - */ - RaftClientReply add(RaftGroup newGroup, boolean format) throws IOException; - - /** The same as add(newGroup, true). */ - default RaftClientReply add(RaftGroup newGroup) throws IOException { - return add(newGroup, true); - } + /** Add a new group. */ + RaftClientReply add(RaftGroup newGroup) throws IOException; /** Remove a group. */ RaftClientReply remove(RaftGroupId groupId, boolean deleteDirectory, boolean renameDirectory) throws IOException; diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java index 1ac825850..859e1d4f0 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java @@ -571,9 +571,8 @@ public interface ClientProtoUtils { final RaftPeerId serverId = RaftPeerId.valueOf(m.getReplyId()); switch(p.getOpCase()) { case GROUPADD: - final GroupAddRequestProto add = p.getGroupAdd(); return GroupManagementRequest.newAdd(clientId, serverId, m.getCallId(), - ProtoUtils.toRaftGroup(add.getGroup()), add.getFormat()); + ProtoUtils.toRaftGroup(p.getGroupAdd().getGroup())); case GROUPREMOVE: final GroupRemoveRequestProto remove = p.getGroupRemove(); return GroupManagementRequest.newRemove(clientId, serverId, m.getCallId(), @@ -610,10 +609,8 @@ public interface ClientProtoUtils { .setRpcRequest(toRaftRpcRequestProtoBuilder(request)); final GroupManagementRequest.Add add = request.getAdd(); if (add != null) { - b.setGroupAdd(GroupAddRequestProto.newBuilder() - .setGroup(ProtoUtils.toRaftGroupProtoBuilder(add.getGroup())) - .setFormat(add.isFormat()) - .build()); + b.setGroupAdd(GroupAddRequestProto.newBuilder().setGroup( + ProtoUtils.toRaftGroupProtoBuilder(add.getGroup())).build()); } final GroupManagementRequest.Remove remove = request.getRemove(); if (remove != null) { diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/GroupManagementImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/GroupManagementImpl.java index 9501bc2ea..27e0bbffc 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/GroupManagementImpl.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/GroupManagementImpl.java @@ -43,13 +43,13 @@ class GroupManagementImpl implements GroupManagementApi { } @Override - public RaftClientReply add(RaftGroup newGroup, boolean format) throws IOException { + public RaftClientReply add(RaftGroup newGroup) throws IOException { Objects.requireNonNull(newGroup, "newGroup == null"); final long callId = CallId.getAndIncrement(); client.getClientRpc().addRaftPeers(newGroup.getPeers()); return client.io().sendRequestWithRetry( - () -> GroupManagementRequest.newAdd(client.getId(), server, callId, newGroup, format)); + () -> GroupManagementRequest.newAdd(client.getId(), server, callId, newGroup)); } @Override diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/GroupManagementRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupManagementRequest.java index 2783d2c65..d370dfc4c 100644 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/GroupManagementRequest.java +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupManagementRequest.java @@ -26,11 +26,9 @@ public final class GroupManagementRequest extends RaftClientRequest { public static class Add extends Op { private final RaftGroup group; - private final boolean format; - public Add(RaftGroup group, boolean format) { + public Add(RaftGroup group) { this.group = group; - this.format = format; } @Override @@ -42,10 +40,6 @@ public final class GroupManagementRequest extends RaftClientRequest { return group; } - public boolean isFormat() { - return format; - } - @Override public String toString() { return JavaUtils.getClassSimpleName(getClass()) + ":" + getGroup(); @@ -85,9 +79,8 @@ public final class GroupManagementRequest extends RaftClientRequest { } } - public static GroupManagementRequest newAdd(ClientId clientId, RaftPeerId serverId, long callId, - RaftGroup group, boolean format) { - return new GroupManagementRequest(clientId, serverId, callId, new Add(group, format)); + public static GroupManagementRequest newAdd(ClientId clientId, RaftPeerId serverId, long callId, RaftGroup group) { + return new GroupManagementRequest(clientId, serverId, callId, new Add(group)); } public static GroupManagementRequest newRemove(ClientId clientId, RaftPeerId serverId, long callId, diff --git a/ratis-common/src/main/java/org/apache/ratis/util/MemoizedCheckedSupplier.java b/ratis-common/src/main/java/org/apache/ratis/util/MemoizedCheckedSupplier.java deleted file mode 100644 index cf2d06023..000000000 --- a/ratis-common/src/main/java/org/apache/ratis/util/MemoizedCheckedSupplier.java +++ /dev/null @@ -1,90 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ratis.util; - -import org.apache.ratis.util.function.CheckedSupplier; - -import java.util.Objects; - -/** - * A memoized supplier is a {@link CheckedSupplier} - * which gets a value by invoking its initializer once. - * and then keeps returning the same value as its supplied results. - * - * This class is thread safe. - * - * @param <RETURN> The return type of the supplier. - * @param <THROW> The throwable type of the supplier. - */ -public final class MemoizedCheckedSupplier<RETURN, THROW extends Throwable> - implements CheckedSupplier<RETURN, THROW> { - /** - * @param supplier to supply at most one non-null value. - * @return a {@link MemoizedCheckedSupplier} with the given supplier. - */ - public static <RETURN, THROW extends Throwable> MemoizedCheckedSupplier<RETURN, THROW> valueOf( - CheckedSupplier<RETURN, THROW> supplier) { - return supplier instanceof MemoizedCheckedSupplier ? - (MemoizedCheckedSupplier<RETURN, THROW>) supplier : new MemoizedCheckedSupplier<>(supplier); - } - - private final CheckedSupplier<RETURN, THROW> initializer; - private volatile RETURN value = null; - - /** - * Create a memoized supplier. - * @param initializer to supply at most one non-null value. - */ - private MemoizedCheckedSupplier(CheckedSupplier<RETURN, THROW> initializer) { - Objects.requireNonNull(initializer, "initializer == null"); - this.initializer = initializer; - } - - /** @return the lazily initialized object. */ - @Override - public RETURN get() throws THROW { - RETURN v = value; - if (v == null) { - synchronized (this) { - v = value; - if (v == null) { - v = value = Objects.requireNonNull(initializer.get(), "initializer.get() returns null"); - } - } - } - return v; - } - - /** - * @return the already initialized object. - * @throws NullPointerException if the object is uninitialized. - */ - public RETURN getUnchecked() { - return Objects.requireNonNull(value, "value == null"); - } - - /** @return is the object initialized? */ - public boolean isInitialized() { - return value != null; - } - - @Override - public String toString() { - return isInitialized()? "Memoized:" + value: "UNINITIALIZED"; - } -} diff --git a/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java b/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java index 902c1f5e6..e9a002612 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java @@ -19,7 +19,6 @@ package org.apache.ratis.util; import java.util.Collections; import java.util.HashSet; -import java.util.Map; import java.util.Set; import java.util.function.Supplier; @@ -114,10 +113,6 @@ public interface Preconditions { return clazz.cast(object); } - static <K, V> void assertEmpty(Map<K, V> map, Object name) { - assertTrue(map.isEmpty(), () -> "The " + name + " map is non-empty: " + map); - } - static <T> void assertUnique(Iterable<T> first) { assertUnique(first, Collections.emptyList()); } diff --git a/ratis-common/src/main/java/org/apache/ratis/util/SizeInBytes.java b/ratis-common/src/main/java/org/apache/ratis/util/SizeInBytes.java index 683f0da62..25667a378 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/SizeInBytes.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/SizeInBytes.java @@ -23,7 +23,6 @@ import java.util.Objects; * Size which may be constructed with a {@link TraditionalBinaryPrefix}. */ public final class SizeInBytes { - public static final SizeInBytes ZERO = valueOf(0); public static final SizeInBytes ONE_KB = valueOf("1k"); public static final SizeInBytes ONE_MB = valueOf("1m"); diff --git a/ratis-proto/src/main/proto/Raft.proto b/ratis-proto/src/main/proto/Raft.proto index 9fe2494bf..c571f0c73 100644 --- a/ratis-proto/src/main/proto/Raft.proto +++ b/ratis-proto/src/main/proto/Raft.proto @@ -469,7 +469,6 @@ message StartLeaderElectionReplyProto { // A request to add a new group message GroupAddRequestProto { RaftGroupProto group = 1; // the group to be added. - bool format = 2; // Should it format the storage? } message GroupRemoveRequestProto { diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java index 8d00d29db..e9719b96c 100644 --- a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java +++ b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java @@ -167,8 +167,8 @@ public interface RaftServer extends Closeable, RpcType.Get, private static Method initNewRaftServerMethod() { final String className = RaftServer.class.getPackage().getName() + ".impl.ServerImplUtils"; - final Class<?>[] argClasses = {RaftPeerId.class, RaftGroup.class, RaftStorage.StartupOption.class, - StateMachine.Registry.class, RaftProperties.class, Parameters.class}; + final Class<?>[] argClasses = {RaftPeerId.class, RaftGroup.class, StateMachine.Registry.class, + RaftProperties.class, Parameters.class}; try { final Class<?> clazz = ReflectionUtils.getClassByName(className); return clazz.getMethod("newRaftServer", argClasses); @@ -177,12 +177,12 @@ public interface RaftServer extends Closeable, RpcType.Get, } } - private static RaftServer newRaftServer(RaftPeerId serverId, RaftGroup group, RaftStorage.StartupOption option, + private static RaftServer newRaftServer(RaftPeerId serverId, RaftGroup group, StateMachine.Registry stateMachineRegistry, RaftProperties properties, Parameters parameters) throws IOException { try { return (RaftServer) NEW_RAFT_SERVER_METHOD.invoke(null, - serverId, group, option, stateMachineRegistry, properties, parameters); + serverId, group, stateMachineRegistry, properties, parameters); } catch (IllegalAccessException e) { throw new IllegalStateException("Failed to build " + serverId, e); } catch (InvocationTargetException e) { @@ -193,7 +193,6 @@ public interface RaftServer extends Closeable, RpcType.Get, private RaftPeerId serverId; private StateMachine.Registry stateMachineRegistry ; private RaftGroup group = null; - private RaftStorage.StartupOption option = RaftStorage.StartupOption.RECOVER; private RaftProperties properties; private Parameters parameters; @@ -202,7 +201,6 @@ public interface RaftServer extends Closeable, RpcType.Get, return newRaftServer( serverId, group, - option, Objects.requireNonNull(stateMachineRegistry , "Neither 'stateMachine' nor 'setStateMachineRegistry' " + "is initialized."), Objects.requireNonNull(properties, "The 'properties' field is not initialized."), @@ -232,12 +230,6 @@ public interface RaftServer extends Closeable, RpcType.Get, return this; } - /** Set the startup option for the group. */ - public Builder setOption(RaftStorage.StartupOption option) { - this.option = option; - return this; - } - /** Set {@link RaftProperties}. */ public Builder setProperties(RaftProperties properties) { this.properties = properties; diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/storage/RaftStorage.java b/ratis-server-api/src/main/java/org/apache/ratis/server/storage/RaftStorage.java index 59d87c37a..dde3c31bc 100644 --- a/ratis-server-api/src/main/java/org/apache/ratis/server/storage/RaftStorage.java +++ b/ratis-server-api/src/main/java/org/apache/ratis/server/storage/RaftStorage.java @@ -62,7 +62,8 @@ public interface RaftStorage extends Closeable { private static Method initNewRaftStorageMethod() { final String className = RaftStorage.class.getPackage().getName() + ".StorageImplUtils"; - final Class<?>[] argClasses = {File.class, SizeInBytes.class, StartupOption.class, CorruptionPolicy.class}; + //final String className = "org.apache.ratis.server.storage.RaftStorageImpl"; + final Class<?>[] argClasses = { File.class, CorruptionPolicy.class, StartupOption.class, long.class }; try { final Class<?> clazz = ReflectionUtils.getClassByName(className); return clazz.getMethod("newRaftStorage", argClasses); @@ -75,7 +76,7 @@ public interface RaftStorage extends Closeable { StartupOption option, SizeInBytes storageFreeSpaceMin) throws IOException { try { return (RaftStorage) NEW_RAFT_STORAGE_METHOD.invoke(null, - dir, storageFreeSpaceMin, option, logCorruptionPolicy); + dir, logCorruptionPolicy, option, storageFreeSpaceMin.getSize()); } catch (IllegalAccessException e) { throw new IllegalStateException("Failed to build " + dir, e); } catch (InvocationTargetException e) { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index ee7b749dd..1994fa8de 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -189,8 +189,7 @@ class RaftServerImpl implements RaftServer.Division, private final AtomicBoolean firstElectionSinceStartup = new AtomicBoolean(true); - RaftServerImpl(RaftGroup group, StateMachine stateMachine, RaftServerProxy proxy, RaftStorage.StartupOption option) - throws IOException { + RaftServerImpl(RaftGroup group, StateMachine stateMachine, RaftServerProxy proxy) throws IOException { final RaftPeerId id = proxy.getId(); LOG.info("{}: new RaftServerImpl for {} with {}", id, group, stateMachine); this.lifeCycle = new LifeCycle(id); @@ -203,7 +202,7 @@ class RaftServerImpl implements RaftServer.Division, this.sleepDeviationThreshold = RaftServerConfigKeys.sleepDeviationThreshold(properties); this.proxy = proxy; - this.state = new ServerState(id, group, stateMachine, this, option, properties); + this.state = new ServerState(id, group, properties, this, stateMachine); this.retryCache = new RetryCacheImpl(properties); this.dataStreamMap = new DataStreamMapImpl(id); @@ -576,8 +575,8 @@ class RaftServerImpl implements RaftServer.Division, } GroupInfoReply getGroupInfo(GroupInfoRequest request) { - final RaftStorageDirectory dir = state.getStorage().getStorageDir(); - return new GroupInfoReply(request, getCommitInfos(), getGroup(), getRoleInfoProto(), dir.isHealthy()); + return new GroupInfoReply(request, getCommitInfos(), + getGroup(), getRoleInfoProto(), state.getStorage().getStorageDir().isHealthy()); } RoleInfoProto getRoleInfoProto() { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java index 0825e7d12..b8cee7f53 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java @@ -38,7 +38,6 @@ import org.apache.ratis.rpc.RpcType; import org.apache.ratis.server.DataStreamServerRpc; import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.ServerFactory; -import org.apache.ratis.server.storage.RaftStorage.StartupOption; import org.apache.ratis.util.ConcurrentUtils; import org.apache.ratis.util.JvmPauseMonitor; import org.apache.ratis.server.RaftServerConfigKeys; @@ -81,7 +80,7 @@ class RaftServerProxy implements RaftServer { private final ConcurrentMap<RaftGroupId, CompletableFuture<RaftServerImpl>> map = new ConcurrentHashMap<>(); private boolean isClosed = false; - synchronized CompletableFuture<RaftServerImpl> addNew(RaftGroup group, StartupOption option) { + synchronized CompletableFuture<RaftServerImpl> addNew(RaftGroup group) { if (isClosed) { return JavaUtils.completeExceptionally(new AlreadyClosedException( getId() + ": Failed to add " + group + " since the server is already closed")); @@ -91,7 +90,7 @@ class RaftServerProxy implements RaftServer { getId() + ": Failed to add " + group + " since the group already exists in the map.")); } final RaftGroupId groupId = group.getGroupId(); - final CompletableFuture<RaftServerImpl> newImpl = newRaftServerImpl(group, option); + final CompletableFuture<RaftServerImpl> newImpl = newRaftServerImpl(group); final CompletableFuture<RaftServerImpl> previous = map.put(groupId, newImpl); Preconditions.assertNull(previous, "previous"); LOG.info("{}: addNew {} returns {}", getId(), group, toString(groupId, newImpl)); @@ -231,7 +230,7 @@ class RaftServerProxy implements RaftServer { } /** Check the storage dir and add groups*/ - void initGroups(RaftGroup group, StartupOption option) { + void initGroups(RaftGroup group) { final Optional<RaftGroup> raftGroup = Optional.ofNullable(group); final RaftGroupId raftGroupId = raftGroup.map(RaftGroup::getGroupId).orElse(null); final Predicate<RaftGroupId> shouldAdd = gid -> gid != null && !gid.equals(raftGroupId); @@ -242,7 +241,7 @@ class RaftServerProxy implements RaftServer { .filter(File::isDirectory) .forEach(sub -> initGroupDir(sub, shouldAdd)), executor).join(); - raftGroup.ifPresent(g -> addGroup(g, option)); + raftGroup.ifPresent(this::addGroup); } private void initGroupDir(File sub, Predicate<RaftGroupId> shouldAdd) { @@ -256,7 +255,7 @@ class RaftServerProxy implements RaftServer { " ignoring it. ", getId(), sub.getAbsolutePath()); } if (shouldAdd.test(groupId)) { - addGroup(RaftGroup.valueOf(groupId), StartupOption.RECOVER); + addGroup(RaftGroup.valueOf(groupId)); } } catch (Exception e) { LOG.warn(getId() + ": Failed to initialize the group directory " @@ -270,11 +269,11 @@ class RaftServerProxy implements RaftServer { getDataStreamServerRpc().addRaftPeers(others); } - private CompletableFuture<RaftServerImpl> newRaftServerImpl(RaftGroup group, StartupOption option) { + private CompletableFuture<RaftServerImpl> newRaftServerImpl(RaftGroup group) { return CompletableFuture.supplyAsync(() -> { try { addRaftPeers(group.getPeers()); - return new RaftServerImpl(group, stateMachineRegistry.apply(group.getGroupId()), this, option); + return new RaftServerImpl(group, stateMachineRegistry.apply(group.getGroupId()), this); } catch(IOException e) { throw new CompletionException(getId() + ": Failed to initialize server for " + group, e); } @@ -342,8 +341,8 @@ class RaftServerProxy implements RaftServer { return dataStreamServerRpc; } - private CompletableFuture<RaftServerImpl> addGroup(RaftGroup group, StartupOption option) { - return impls.addNew(group, option); + private CompletableFuture<RaftServerImpl> addGroup(RaftGroup group) { + return impls.addNew(group); } private CompletableFuture<RaftServerImpl> getImplFuture(RaftGroupId groupId) { @@ -467,7 +466,7 @@ class RaftServerProxy implements RaftServer { } final GroupManagementRequest.Add add = request.getAdd(); if (add != null) { - return groupAddAsync(request, add.getGroup(), add.isFormat()); + return groupAddAsync(request, add.getGroup()); } final GroupManagementRequest.Remove remove = request.getRemove(); if (remove != null) { @@ -478,13 +477,12 @@ class RaftServerProxy implements RaftServer { getId() + ": Request not supported " + request)); } - private CompletableFuture<RaftClientReply> groupAddAsync( - GroupManagementRequest request, RaftGroup newGroup, boolean format) { + private CompletableFuture<RaftClientReply> groupAddAsync(GroupManagementRequest request, RaftGroup newGroup) { if (!request.getRaftGroupId().equals(newGroup.getGroupId())) { return JavaUtils.completeExceptionally(new GroupMismatchException( getId() + ": Request group id (" + request.getRaftGroupId() + ") does not match the new group " + newGroup)); } - return impls.addNew(newGroup, format? StartupOption.FORMAT: StartupOption.RECOVER) + return impls.addNew(newGroup) .thenApplyAsync(newImpl -> { LOG.debug("{}: newImpl = {}", getId(), newImpl); try { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java index 6e1ddd548..6777b9093 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java @@ -26,7 +26,6 @@ import org.apache.ratis.server.RaftConfiguration; import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.raftlog.RaftLog; -import org.apache.ratis.server.storage.RaftStorage; import org.apache.ratis.statemachine.StateMachine; import org.apache.ratis.util.IOUtils; import org.apache.ratis.util.JavaUtils; @@ -46,7 +45,7 @@ public final class ServerImplUtils { /** Create a {@link RaftServerProxy}. */ public static RaftServerProxy newRaftServer( - RaftPeerId id, RaftGroup group, RaftStorage.StartupOption option, StateMachine.Registry stateMachineRegistry, + RaftPeerId id, RaftGroup group, StateMachine.Registry stateMachineRegistry, RaftProperties properties, Parameters parameters) throws IOException { RaftServer.LOG.debug("newRaftServer: {}, {}", id, group); if (group != null && !group.getPeers().isEmpty()) { @@ -54,7 +53,7 @@ public final class ServerImplUtils { Preconditions.assertNotNull(group.getPeer(id), "RaftPeerId %s is not in RaftGroup %s", id, group); } final RaftServerProxy proxy = newRaftServer(id, stateMachineRegistry, properties, parameters); - proxy.initGroups(group, option); + proxy.initGroups(group); return proxy; } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java index e92f9b911..1ee2cab5e 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java @@ -36,14 +36,20 @@ import org.apache.ratis.statemachine.SnapshotInfo; import org.apache.ratis.statemachine.StateMachine; import org.apache.ratis.statemachine.TransactionContext; import org.apache.ratis.util.JavaUtils; -import org.apache.ratis.util.MemoizedCheckedSupplier; import org.apache.ratis.util.MemoizedSupplier; import org.apache.ratis.util.TimeDuration; import org.apache.ratis.util.Timestamp; +import java.io.Closeable; +import java.io.File; import java.io.IOException; +import java.nio.channels.OverlappingFileLockException; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.concurrent.atomic.AtomicLong; @@ -57,7 +63,7 @@ import static org.apache.ratis.server.RaftServer.Division.LOG; /** * Common states of a raft peer. Protected by RaftServer's lock. */ -class ServerState { +class ServerState implements Closeable { private final RaftGroupMemberId memberId; private final RaftServerImpl server; /** Raft log */ @@ -67,7 +73,7 @@ class ServerState { /** The thread that applies committed log entries to the state machine */ private final MemoizedSupplier<StateMachineUpdater> stateMachineUpdater; /** local storage for log and snapshot */ - private final MemoizedCheckedSupplier<RaftStorageImpl, IOException> raftStorage; + private RaftStorageImpl storage; private final SnapshotManager snapshotManager; private volatile Timestamp lastNoLeaderTime; private final TimeDuration noLeaderTimeout; @@ -94,8 +100,9 @@ class ServerState { */ private final AtomicReference<TermIndex> latestInstalledSnapshot = new AtomicReference<>(); - ServerState(RaftPeerId id, RaftGroup group, StateMachine stateMachine, RaftServerImpl server, - RaftStorage.StartupOption option, RaftProperties prop) { + ServerState(RaftPeerId id, RaftGroup group, RaftProperties prop, + RaftServerImpl server, StateMachine stateMachine) + throws IOException { this.memberId = RaftGroupMemberId.valueOf(id, group.getGroupId()); this.server = server; Collection<RaftPeer> followerPeers = group.getPeers().stream() @@ -110,11 +117,36 @@ class ServerState { configurationManager = new ConfigurationManager(initialConf); LOG.info("{}: {}", getMemberId(), configurationManager); - final String storageDirName = group.getGroupId().getUuid().toString(); - this.raftStorage = MemoizedCheckedSupplier.valueOf( - () -> StorageImplUtils.initRaftStorage(storageDirName, option, prop)); + boolean storageFound = false; + List<File> directories = RaftServerConfigKeys.storageDir(prop); + while (!directories.isEmpty()) { + // use full uuid string to create a subdirectory + File dir = chooseStorageDir(directories, group.getGroupId().getUuid().toString()); + try { + storage = (RaftStorageImpl) RaftStorage.newBuilder() + .setDirectory(dir) + .setOption(RaftStorage.StartupOption.RECOVER) + .setLogCorruptionPolicy(RaftServerConfigKeys.Log.corruptionPolicy(prop)) + .setStorageFreeSpaceMin(RaftServerConfigKeys.storageFreeSpaceMin(prop)) + .build(); + storageFound = true; + break; + } catch (IOException e) { + if (e.getCause() instanceof OverlappingFileLockException) { + throw e; + } + LOG.warn("Failed to init RaftStorage under {} for {}: {}", + dir.getParent(), group.getGroupId().getUuid().toString(), e); + directories.removeIf(d -> d.getAbsolutePath().equals(dir.getParent())); + } + } + + if (!storageFound) { + throw new IOException("No healthy directories found for RaftStorage among: " + + RaftServerConfigKeys.storageDir(prop)); + } - this.snapshotManager = StorageImplUtils.newSnapshotManager(id); + snapshotManager = new SnapshotManager(storage, id); // On start the leader is null, start the clock now this.leaderId = null; @@ -131,8 +163,7 @@ class ServerState { } void initialize(StateMachine stateMachine) throws IOException { - // initialize raft storage - final RaftStorageImpl storage = raftStorage.get(); + storage.initialize(); // read configuration from the storage Optional.ofNullable(storage.readRaftConfiguration()).ifPresent(this::setRaftConf); @@ -149,8 +180,32 @@ class ServerState { return memberId; } + static File chooseStorageDir(List<File> volumes, String targetSubDir) throws IOException { + final Map<File, Integer> numberOfStorageDirPerVolume = new HashMap<>(); + final File[] empty = {}; + final List<File> resultList = new ArrayList<>(); + volumes.stream().flatMap(volume -> { + final File[] dirs = Optional.ofNullable(volume.listFiles()).orElse(empty); + numberOfStorageDirPerVolume.put(volume, dirs.length); + return Arrays.stream(dirs); + }).filter(dir -> targetSubDir.equals(dir.getName())) + .forEach(resultList::add); + + if (resultList.size() > 1) { + throw new IOException("More than one directories found for " + targetSubDir + ": " + resultList); + } + if (resultList.size() == 1) { + return resultList.get(0); + } + return numberOfStorageDirPerVolume.entrySet().stream() + .min(Map.Entry.comparingByValue()) + .map(Map.Entry::getKey) + .map(v -> new File(v, targetSubDir)) + .orElseThrow(() -> new IOException("No storage directory found.")); + } + void writeRaftConfiguration(LogEntryProto conf) { - getStorage().writeRaftConfiguration(conf); + storage.writeRaftConfiguration(conf); } void start() { @@ -159,8 +214,7 @@ class ServerState { private RaftLog initRaftLog(LongSupplier getSnapshotIndexFromStateMachine, RaftProperties prop) { try { - return initRaftLog(getMemberId(), server, getStorage(), this::setRaftConf, - getSnapshotIndexFromStateMachine, prop); + return initRaftLog(getMemberId(), server, storage, this::setRaftConf, getSnapshotIndexFromStateMachine, prop); } catch (IOException e) { throw new IllegalStateException(getMemberId() + ": Failed to initRaftLog.", e); } @@ -205,6 +259,10 @@ class ServerState { return leaderId; } + boolean hasLeader() { + return leaderId != null; + } + /** * Become a candidate and start leader election */ @@ -376,7 +434,7 @@ class ServerState { void updateConfiguration(List<LogEntryProto> entries) { if (entries != null && !entries.isEmpty()) { configurationManager.removeConfigurations(entries.get(0).getIndex()); - entries.forEach(this::setRaftConf); + entries.stream().forEach(this::setRaftConf); } } @@ -397,45 +455,29 @@ class ServerState { getStateMachineUpdater().reloadStateMachine(); } - void close() { - try { - if (stateMachineUpdater.isInitialized()) { - getStateMachineUpdater().stopAndJoin(); - } - } catch (Throwable e) { - LOG.warn(getMemberId() + ": Failed to join " + getStateMachineUpdater(), e); - } - LOG.info("{}: applyIndex: {}", getMemberId(), getLastAppliedIndex()); - + @Override + public void close() throws IOException { try { - if (log.isInitialized()) { - getLog().close(); - } - } catch (Throwable e) { - LOG.warn(getMemberId() + ": Failed to close raft log " + getLog(), e); + getStateMachineUpdater().stopAndJoin(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.warn("{}: Interrupted when joining stateMachineUpdater", getMemberId(), e); } + LOG.info("{}: closes. applyIndex: {}", getMemberId(), getLastAppliedIndex()); - try { - if (raftStorage.isInitialized()) { - getStorage().close(); - } - } catch (Throwable e) { - LOG.warn(getMemberId() + ": Failed to close raft storage " + getStorage(), e); - } + getLog().close(); + storage.close(); } - RaftStorageImpl getStorage() { - if (!raftStorage.isInitialized()) { - throw new IllegalStateException(getMemberId() + ": raftStorage is uninitialized."); - } - return raftStorage.getUnchecked(); + RaftStorage getStorage() { + return storage; } void installSnapshot(InstallSnapshotRequestProto request) throws IOException { // TODO: verify that we need to install the snapshot StateMachine sm = server.getStateMachine(); sm.pause(); // pause the SM to prepare for install snapshot - snapshotManager.installSnapshot(request, sm, getStorage().getStorageDir()); + snapshotManager.installSnapshot(sm, request); updateInstalledSnapshotIndex(TermIndex.valueOf(request.getSnapshotChunk().getTermIndex())); } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectoryImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectoryImpl.java index a86cdf56b..a7fa13b53 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectoryImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectoryImpl.java @@ -19,7 +19,6 @@ package org.apache.ratis.server.storage; import org.apache.ratis.util.AtomicFileOutputStream; import org.apache.ratis.util.FileUtils; -import org.apache.ratis.util.SizeInBytes; import java.io.File; import java.io.IOException; @@ -52,13 +51,13 @@ class RaftStorageDirectoryImpl implements RaftStorageDirectory { private final File root; // root directory private FileLock lock; // storage lock - private final SizeInBytes freeSpaceMin; + private long freeSpaceMin; /** * Constructor * @param dir directory corresponding to the storage */ - RaftStorageDirectoryImpl(File dir, SizeInBytes freeSpaceMin) { + RaftStorageDirectoryImpl(File dir, long freeSpaceMin) { this.root = dir; this.lock = null; this.freeSpaceMin = freeSpaceMin; @@ -178,7 +177,7 @@ class RaftStorageDirectoryImpl implements RaftStorageDirectory { } private boolean hasEnoughSpace() { - return root.getFreeSpace() >= freeSpaceMin.getSize(); + return root.getFreeSpace() > freeSpaceMin; } /** diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageImpl.java index 56972c3f7..6513efd30 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageImpl.java @@ -23,7 +23,6 @@ import org.apache.ratis.server.RaftServerConfigKeys.Log.CorruptionPolicy; import org.apache.ratis.server.raftlog.LogProtoUtils; import org.apache.ratis.server.storage.RaftStorageDirectoryImpl.StorageState; import org.apache.ratis.util.JavaUtils; -import org.apache.ratis.util.SizeInBytes; import java.io.File; import java.io.FileNotFoundException; @@ -35,16 +34,17 @@ import java.util.Optional; /** The storage of a {@link org.apache.ratis.server.RaftServer}. */ public class RaftStorageImpl implements RaftStorage { + + // TODO support multiple storage directories private final RaftStorageDirectoryImpl storageDir; private final StartupOption startupOption; private final CorruptionPolicy logCorruptionPolicy; private volatile StorageState state = StorageState.UNINITIALIZED; private volatile RaftStorageMetadataFileImpl metaFile; - RaftStorageImpl(File dir, SizeInBytes freeSpaceMin, StartupOption option, CorruptionPolicy logCorruptionPolicy) { - LOG.debug("newRaftStorage: {}, freeSpaceMin={}, option={}, logCorruptionPolicy={}", - dir, freeSpaceMin, option, logCorruptionPolicy); - this.storageDir = new RaftStorageDirectoryImpl(dir, freeSpaceMin); + RaftStorageImpl(File dir, CorruptionPolicy logCorruptionPolicy, StartupOption option, + long storageFeeSpaceMin) { + this.storageDir = new RaftStorageDirectoryImpl(dir, storageFeeSpaceMin); this.logCorruptionPolicy = Optional.ofNullable(logCorruptionPolicy).orElseGet(CorruptionPolicy::getDefault); this.startupOption = option; } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java index 17294b572..aaa62a783 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java @@ -54,17 +54,21 @@ public class SnapshotManager { private static final String CORRUPT = ".corrupt"; private static final String TMP = ".tmp"; + private final RaftStorage storage; private final RaftPeerId selfId; private final Supplier<MessageDigest> digester = JavaUtils.memoize(MD5Hash::getDigester); - SnapshotManager(RaftPeerId selfId) { + public SnapshotManager(RaftStorage storage, RaftPeerId selfId) { + this.storage = storage; this.selfId = selfId; } - public void installSnapshot(InstallSnapshotRequestProto request, StateMachine stateMachine, RaftStorageDirectory dir) - throws IOException { - final InstallSnapshotRequestProto.SnapshotChunkProto snapshotChunkRequest = request.getSnapshotChunk(); + public void installSnapshot(StateMachine stateMachine, + InstallSnapshotRequestProto request) throws IOException { + final InstallSnapshotRequestProto.SnapshotChunkProto snapshotChunkRequest = + request.getSnapshotChunk(); final long lastIncludedIndex = snapshotChunkRequest.getTermIndex().getIndex(); + final RaftStorageDirectory dir = storage.getStorageDir(); // create a unique temporary directory final File tmpDir = new File(dir.getTmpDir(), "snapshot-" + snapshotChunkRequest.getRequestId()); diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/StorageImplUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/StorageImplUtils.java index 865e2b2b1..aeff60148 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/StorageImplUtils.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/StorageImplUtils.java @@ -17,158 +17,38 @@ */ package org.apache.ratis.server.storage; -import org.apache.ratis.conf.RaftProperties; -import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.RaftServerConfigKeys; -import org.apache.ratis.server.RaftServerConfigKeys.Log; -import org.apache.ratis.server.storage.RaftStorage.StartupOption; -import org.apache.ratis.util.SizeInBytes; +import org.apache.ratis.util.IOUtils; +import org.apache.ratis.util.JavaUtils; +import org.apache.ratis.util.TimeDuration; import java.io.File; import java.io.IOException; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.stream.Collectors; - -import static org.apache.ratis.server.RaftServer.Division.LOG; +import java.util.concurrent.TimeUnit; public final class StorageImplUtils { - private static final File[] EMPTY_FILE_ARRAY = {}; private StorageImplUtils() { //Never constructed } - public static SnapshotManager newSnapshotManager(RaftPeerId id) { - return new SnapshotManager(id); - } - /** Create a {@link RaftStorageImpl}. */ - public static RaftStorageImpl newRaftStorage(File dir, SizeInBytes freeSpaceMin, - RaftStorage.StartupOption option, Log.CorruptionPolicy logCorruptionPolicy) { - return new RaftStorageImpl(dir, freeSpaceMin, option, logCorruptionPolicy); - } - - /** @return a list of existing subdirectories matching the given storage directory name from the given volumes. */ - static List<File> getExistingStorageSubs(List<File> volumes, String targetSubDir, Map<File, Integer> dirsPerVol) { - return volumes.stream().flatMap(volume -> { - final File[] dirs = Optional.ofNullable(volume.listFiles()).orElse(EMPTY_FILE_ARRAY); - Optional.ofNullable(dirsPerVol).ifPresent(map -> map.put(volume, dirs.length)); - return Arrays.stream(dirs); - }).filter(dir -> targetSubDir.equals(dir.getName())) - .collect(Collectors.toList()); - } - - /** @return a volume with the min dirs. */ - static File chooseMin(Map<File, Integer> dirsPerVol) throws IOException { - return dirsPerVol.entrySet().stream() - .min(Map.Entry.comparingByValue()) - .map(Map.Entry::getKey) - .orElseThrow(() -> new IOException("No storage directory found.")); - } - - /** - * Choose a {@link RaftStorage} for the given storage directory name from the given configuration properties - * and then try to call {@link RaftStorage#initialize()}. - * <p /> - * {@link StartupOption#FORMAT}: - * - When there are more than one existing directories, throw an exception. - * - When there is an existing directory, throw an exception. - * - When there is no existing directory, try to initialize a new directory from the list specified - * in the configuration properties until a directory succeeded or all directories failed. - * <p /> - * {@link StartupOption#RECOVER}: - * - When there are more than one existing directories, throw an exception. - * - When there is an existing directory, if it fails to initialize, throw an exception but not try a new directory. - * - When there is no existing directory, throw an exception. - * - * @param storageDirName the storage directory name - * @param option the startup option - * @param properties the configuration properties - * @return the chosen storage, which is initialized successfully. - */ - public static RaftStorageImpl initRaftStorage(String storageDirName, StartupOption option, - RaftProperties properties) throws IOException { - return new Op(storageDirName, option, properties).run(); - } - - private static class Op { - private final String storageDirName; - private final StartupOption option; - - private final SizeInBytes freeSpaceMin; - private final Log.CorruptionPolicy logCorruptionPolicy; - private final List<File> dirsInConf; - - private final List<File> existingSubs; - private final Map<File, Integer> dirsPerVol = new HashMap<>(); - - Op(String storageDirName, StartupOption option, RaftProperties properties) { - this.storageDirName = storageDirName; - this.option = option; - - this.freeSpaceMin = RaftServerConfigKeys.storageFreeSpaceMin(properties); - this.logCorruptionPolicy = RaftServerConfigKeys.Log.corruptionPolicy(properties); - this.dirsInConf = RaftServerConfigKeys.storageDir(properties); - - this.existingSubs = getExistingStorageSubs(dirsInConf, this.storageDirName, dirsPerVol); - } - - RaftStorageImpl run() throws IOException { - if (option == StartupOption.FORMAT) { - return format(); - } else if (option == StartupOption.RECOVER) { - return recover(); - } else { - throw new IllegalArgumentException("Illegal option: " + option); - } - } - - private RaftStorageImpl format() throws IOException { - if (!existingSubs.isEmpty()) { - throw new IOException("Failed to " + option + ": One or more existing directories found " + existingSubs - + " for " + storageDirName); - } - - for (; !dirsPerVol.isEmpty(); ) { - final File vol = chooseMin(dirsPerVol); - final File dir = new File(vol, storageDirName); - try { - final RaftStorageImpl storage = newRaftStorage(dir, freeSpaceMin, StartupOption.FORMAT, logCorruptionPolicy); - storage.initialize(); - return storage; - } catch (Throwable e) { - LOG.warn("Failed to initialize a new directory " + dir.getAbsolutePath(), e); - dirsPerVol.remove(vol); - } - } - throw new IOException("Failed to FORMAT a new storage dir for " + storageDirName + " from " + dirsInConf); - } - - private RaftStorageImpl recover() throws IOException { - final int size = existingSubs.size(); - if (size > 1) { - throw new IOException("Failed to " + option + ": More than one existing directories found " - + existingSubs + " for " + storageDirName); - } else if (size == 0) { - throw new IOException("Failed to " + option + ": Storage directory not found for " - + storageDirName + " from " + dirsInConf); - } - - final File dir = existingSubs.get(0); - try { - final RaftStorageImpl storage = newRaftStorage(dir, freeSpaceMin, StartupOption.RECOVER, logCorruptionPolicy); - storage.initialize(); - return storage; - } catch (Throwable e) { - if (e instanceof IOException) { - throw e; - } - throw new IOException("Failed to initialize the existing directory " + dir.getAbsolutePath(), e); - } + public static RaftStorageImpl newRaftStorage(File dir, RaftServerConfigKeys.Log.CorruptionPolicy logCorruptionPolicy, + RaftStorage.StartupOption option, long storageFeeSpaceMin) throws IOException { + RaftStorage.LOG.debug("newRaftStorage: {}, {}, {}, {}",dir, logCorruptionPolicy, option, storageFeeSpaceMin); + + final TimeDuration sleepTime = TimeDuration.valueOf(500, TimeUnit.MILLISECONDS); + final RaftStorageImpl raftStorage; + try { + // attempt multiple times to avoid temporary bind exception + raftStorage = JavaUtils.attemptRepeatedly( + () -> new RaftStorageImpl(dir, logCorruptionPolicy, option, storageFeeSpaceMin), + 5, sleepTime, "new RaftStorageImpl", RaftStorage.LOG); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw IOUtils.toInterruptedIOException( + "Interrupted when creating RaftStorage " + dir, e); } + return raftStorage; } } diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java index dfed96952..1f4047524 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java @@ -40,7 +40,6 @@ import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.ServerFactory; import org.apache.ratis.server.raftlog.memory.MemoryRaftLog; import org.apache.ratis.server.raftlog.RaftLog; -import org.apache.ratis.server.storage.RaftStorage; import org.apache.ratis.statemachine.StateMachine; import org.apache.ratis.statemachine.impl.BaseStateMachine; import org.apache.ratis.util.CollectionUtils; @@ -386,9 +385,8 @@ public abstract class MiniRaftCluster implements Closeable { } final RaftProperties prop = new RaftProperties(properties); RaftServerConfigKeys.setStorageDir(prop, Collections.singletonList(dir)); - return ServerImplUtils.newRaftServer(id, group, - format? RaftStorage.StartupOption.FORMAT: RaftStorage.StartupOption.RECOVER, - getStateMachineRegistry(prop), prop, setPropertiesAndInitParameters(id, group, prop)); + return ServerImplUtils.newRaftServer(id, group, getStateMachineRegistry(prop), prop, + setPropertiesAndInitParameters(id, group, prop)); } catch (IOException e) { throw new RuntimeException(e); } diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java index 5e6353f92..e09ca19d1 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java @@ -365,7 +365,7 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER extends MiniRaftCluste // start the two new peers LOG.info("Start new peers"); for (RaftPeer np : c1.newPeers) { - cluster.restartServer(np.getId(), true); + cluster.restartServer(np.getId(), false); } Assert.assertTrue(client.admin().setConfiguration(c1.allPeersInNewConf).isSuccess()); } @@ -504,7 +504,7 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER extends MiniRaftCluste LOG.info("start new peers: {}", Arrays.asList(c1.newPeers)); for (RaftPeer np : c1.newPeers) { - cluster.restartServer(np.getId(), true); + cluster.restartServer(np.getId(), false); } try { diff --git a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestStorageImplUtils.java b/ratis-test/src/test/java/org/apache/ratis/server/impl/TestServerState.java similarity index 82% rename from ratis-test/src/test/java/org/apache/ratis/server/storage/TestStorageImplUtils.java rename to ratis-test/src/test/java/org/apache/ratis/server/impl/TestServerState.java index ff38a6bd9..75aef53a1 100644 --- a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestStorageImplUtils.java +++ b/ratis-test/src/test/java/org/apache/ratis/server/impl/TestServerState.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.ratis.server.storage; +package org.apache.ratis.server.impl; import org.apache.ratis.BaseTest; import org.apache.ratis.util.FileUtils; @@ -28,9 +28,7 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; import java.util.function.Supplier; @@ -39,20 +37,13 @@ import java.util.stream.IntStream; /** * Test cases to verify ServerState. */ -public class TestStorageImplUtils { +public class TestServerState { private static final Supplier<File> rootTestDir = JavaUtils.memoize( () -> new File(BaseTest.getRootTestDir(), - JavaUtils.getClassSimpleName(TestStorageImplUtils.class) + + JavaUtils.getClassSimpleName(TestServerState.class) + Integer.toHexString(ThreadLocalRandom.current().nextInt()))); - static File chooseNewStorageDir(List<File> volumes, String sub) throws IOException { - final Map<File, Integer> numDirPerVolume = new HashMap<>(); - StorageImplUtils.getExistingStorageSubs(volumes, sub, numDirPerVolume); - final File vol = StorageImplUtils.chooseMin(numDirPerVolume); - return new File(vol, sub); - } - @AfterClass public static void tearDown() throws IOException { FileUtils.deleteFully(rootTestDir.get()); @@ -69,8 +60,8 @@ public class TestStorageImplUtils { List<File> directories = Collections.singletonList(testDir); String subDirOne = UUID.randomUUID().toString(); String subDirTwo = UUID.randomUUID().toString(); - final File storageDirOne = chooseNewStorageDir(directories, subDirOne); - final File storageDirTwo = chooseNewStorageDir(directories, subDirTwo); + File storageDirOne = ServerState.chooseStorageDir(directories, subDirOne); + File storageDirTwo = ServerState.chooseStorageDir(directories, subDirTwo); File expectedOne = new File(testDir, subDirOne); File expectedTwo = new File(testDir, subDirTwo); Assert.assertEquals(expectedOne.getCanonicalPath(), @@ -109,7 +100,7 @@ public class TestStorageImplUtils { } }); String subDir = UUID.randomUUID().toString(); - final File storageDirectory = chooseNewStorageDir(directories, subDir); + File storageDirectory = ServerState.chooseStorageDir(directories, subDir); File expected = new File(directories.get(6), subDir); Assert.assertEquals(expected.getCanonicalPath(), storageDirectory.getCanonicalPath()); @@ -117,15 +108,19 @@ public class TestStorageImplUtils { /** * Tests choosing of storage directory when only no volume is configured. + * + * @throws IOException in case of exception. */ @Test public void testChooseStorageDirWithNoVolume() { try { - chooseNewStorageDir(Collections.emptyList(), UUID.randomUUID().toString()); + ServerState.chooseStorageDir( + Collections.emptyList(), UUID.randomUUID().toString()); Assert.fail(); } catch (IOException ex) { String expectedErrMsg = "No storage directory found."; Assert.assertEquals(expectedErrMsg, ex.getMessage()); } } + } \ No newline at end of file diff --git a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java index 7027bd8ea..ffc8de597 100644 --- a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java +++ b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java @@ -114,7 +114,7 @@ public class TestRaftStorage extends BaseTest { */ @Test public void testStorage() throws Exception { - final RaftStorageDirectoryImpl sd = new RaftStorageDirectoryImpl(storageDir, SizeInBytes.ZERO); + final RaftStorageDirectoryImpl sd = new RaftStorageDirectoryImpl(storageDir, 0); try { StorageState state = sd.analyzeStorage(true); Assert.assertEquals(StorageState.NOT_FORMATTED, state); @@ -171,7 +171,7 @@ public class TestRaftStorage extends BaseTest { Assert.assertEquals(StorageState.NORMAL, storage.getState()); storage.close(); - final RaftStorageDirectoryImpl sd = new RaftStorageDirectoryImpl(storageDir, SizeInBytes.ZERO); + final RaftStorageDirectoryImpl sd = new RaftStorageDirectoryImpl(storageDir, 0); File metaFile = sd.getMetaFile(); FileUtils.move(metaFile, sd.getMetaTmpFile()); @@ -286,7 +286,7 @@ public class TestRaftStorage extends BaseTest { File mockStorageDir = Mockito.spy(storageDir); Mockito.when(mockStorageDir.getFreeSpace()).thenReturn(100L); // 100B - final RaftStorageDirectoryImpl sd = new RaftStorageDirectoryImpl(mockStorageDir, SizeInBytes.valueOf("100M")); + final RaftStorageDirectoryImpl sd = new RaftStorageDirectoryImpl(mockStorageDir, 104857600); // 100MB StorageState state = sd.analyzeStorage(false); Assert.assertEquals(StorageState.NO_SPACE, state); }
