This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new ada2c1cc0 RATIS-1855 Fix some sonar code smell and bugs in
ratis-server (#892)
ada2c1cc0 is described below
commit ada2c1cc0b80ea01f65f1debfea12818373243e2
Author: Potato <[email protected]>
AuthorDate: Sat Jul 8 09:28:25 2023 +0800
RATIS-1855 Fix some sonar code smell and bugs in ratis-server (#892)
---
.../main/java/org/apache/ratis/grpc/GrpcUtil.java | 2 +-
.../apache/ratis/server/raftlog/LogProtoUtils.java | 8 +++---
.../apache/ratis/server/raftlog/RaftLogBase.java | 14 ++++++----
.../raftlog/segmented/BufferedWriteChannel.java | 2 ++
.../raftlog/segmented/SegmentedRaftLogWorker.java | 1 +
.../server/storage/RaftStorageDirectoryImpl.java | 18 ++++++-------
.../ratis/server/storage/RaftStorageImpl.java | 30 ++++++++++++++++------
.../ratis/server/storage/SnapshotManager.java | 6 ++---
.../ratis/server/storage/StorageImplUtils.java | 10 +++++---
9 files changed, 58 insertions(+), 33 deletions(-)
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java
index 8997c6dec..ee1c28dd3 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java
@@ -236,7 +236,7 @@ public interface GrpcUtil {
if (!managedChannel.awaitTermination(2, TimeUnit.SECONDS)) {
LOG.warn("Timed out forcefully shutting down connection: {}. ",
managedChannel);
}
- }catch (InterruptedException e) {
+ } catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
LOG.error("Unexpected exception while waiting for channel
termination", e);
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/LogProtoUtils.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/LogProtoUtils.java
index d84c35eb0..b75777c9e 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/LogProtoUtils.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/LogProtoUtils.java
@@ -25,6 +25,7 @@ import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.server.RaftConfiguration;
import org.apache.ratis.server.impl.ServerImplUtils;
import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.thirdparty.com.google.protobuf.AbstractMessage;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.ProtoUtils;
@@ -52,8 +53,9 @@ public final class LogProtoUtils {
s = "(c:" + metadata.getCommitIndex() + ")";
} else if (entry.hasConfigurationEntry()) {
final RaftConfigurationProto config = entry.getConfigurationEntry();
- s = "(current:" + config.getPeersList().stream().map(p ->
p.toString()).collect(Collectors.joining(",")) +
- ", old:" + config.getOldPeersList().stream().map(p ->
p.toString()).collect(Collectors.joining(",")) + ")";
+ s = "(current:" +
config.getPeersList().stream().map(AbstractMessage::toString).collect(Collectors.joining(","))
+
+ ", old:" +
config.getOldPeersList().stream().map(AbstractMessage::toString).collect(Collectors.joining(","))
+ + ")";
} else {
s = "";
}
@@ -71,7 +73,7 @@ public final class LogProtoUtils {
public static String toLogEntriesShortString(List<LogEntryProto> entries) {
return entries == null ? null
- : entries.size() == 0 ? "<empty>"
+ : entries.isEmpty()? "<empty>"
: "size=" + entries.size() + ", first=" +
LogProtoUtils.toLogEntryString(entries.get(0));
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
index 11eddb927..579d62cf5 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
@@ -17,6 +17,7 @@
*/
package org.apache.ratis.server.raftlog;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.protocol.RaftGroupMemberId;
@@ -78,7 +79,7 @@ public abstract class RaftLogBase implements RaftLog {
private final TimeDuration stateMachineDataReadTimeout;
private final long purgePreservation;
- private volatile LogEntryProto lastMetadataEntry = null;
+ private final AtomicReference<LogEntryProto> lastMetadataEntry = new
AtomicReference<>();
protected RaftLogBase(RaftGroupMemberId memberId,
LongSupplier getSnapshotIndexFromStateMachine,
@@ -207,7 +208,7 @@ public abstract class RaftLogBase implements RaftLog {
entry = LogProtoUtils.toLogEntryProto(newCommitIndex, term, nextIndex);
appendEntry(entry);
}
- lastMetadataEntry = entry;
+ lastMetadataEntry.set(entry);
return nextIndex;
}
@@ -215,7 +216,7 @@ public abstract class RaftLogBase implements RaftLog {
if (newCommitIndex <= 0) {
// do not log the first conf entry
return false;
- } else if (Optional.ofNullable(lastMetadataEntry)
+ } else if (Optional.ofNullable(lastMetadataEntry.get())
.filter(e -> e.getIndex() == newCommitIndex ||
e.getMetadataEntry().getCommitIndex() >= newCommitIndex)
.isPresent()) {
//log neither lastMetadataEntry, nor entries with a smaller commit index.
@@ -250,12 +251,12 @@ public abstract class RaftLogBase implements RaftLog {
public final void open(long lastIndexInSnapshot, Consumer<LogEntryProto>
consumer) throws IOException {
openImpl(lastIndexInSnapshot, e -> {
if (e.hasMetadataEntry()) {
- lastMetadataEntry = e;
+ lastMetadataEntry.set(e);
} else if (consumer != null) {
consumer.accept(e);
}
});
- Optional.ofNullable(lastMetadataEntry).ifPresent(
+ Optional.ofNullable(lastMetadataEntry.get()).ifPresent(
e -> commitIndex.updateToMax(e.getMetadataEntry().getCommitIndex(),
infoIndexChange));
state.open();
@@ -413,6 +414,9 @@ public abstract class RaftLogBase implements RaftLog {
}
throw t;
} catch (Exception e) {
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
final String err = getName() + ": Failed readStateMachineData for " +
toLogEntryString(logEntry);
LOG.error(err, e);
throw new RaftLogIOException(err,
JavaUtils.unwrapCompletionException(e));
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/BufferedWriteChannel.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/BufferedWriteChannel.java
index ef9987ff7..fd06a2b37 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/BufferedWriteChannel.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/BufferedWriteChannel.java
@@ -38,6 +38,8 @@ import java.util.concurrent.atomic.AtomicReference;
* This class is NOT threadsafe.
*/
class BufferedWriteChannel implements Closeable {
+
+ @SuppressWarnings("java:S2095") // return Closable
static BufferedWriteChannel open(File file, boolean append, ByteBuffer
buffer) throws IOException {
final RandomAccessFile raf = new RandomAccessFile(file, "rw");
final FileChannel fc = raf.getChannel();
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
index c867914fe..7928f2297 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
@@ -273,6 +273,7 @@ class SegmentedRaftLogWorker {
if (e instanceof InterruptedException && !running) {
LOG.info("Got InterruptedException when adding task " + task
+ ". The SegmentedRaftLogWorker already stopped.");
+ Thread.currentThread().interrupt();
} else {
LOG.error("Failed to add IO task {}", task, e);
Optional.ofNullable(server).ifPresent(RaftServer.Division::close);
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectoryImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectoryImpl.java
index a86cdf56b..8cf7dd326 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectoryImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectoryImpl.java
@@ -87,7 +87,7 @@ class RaftStorageDirectoryImpl implements
RaftStorageDirectory {
private static void clearDirectory(File dir) throws IOException {
if (dir.exists()) {
- LOG.info(dir + " already exists. Deleting it ...");
+ LOG.info("{} already exists. Deleting it ...", dir);
FileUtils.deleteFully(dir);
}
FileUtils.createDirectories(dir);
@@ -135,16 +135,16 @@ class RaftStorageDirectoryImpl implements
RaftStorageDirectory {
String rootPath = root.getCanonicalPath();
try { // check that storage exists
if (!root.exists()) {
- LOG.info("The storage directory " + rootPath + " does not exist.
Creating ...");
+ LOG.info("The storage directory {} does not exist. Creating ...",
rootPath);
FileUtils.createDirectories(root);
}
// or is inaccessible
if (!root.isDirectory()) {
- LOG.warn(rootPath + " is not a directory");
+ LOG.warn("{} is not a directory", rootPath);
return StorageState.NON_EXISTENT;
}
if (!Files.isWritable(root.toPath())) {
- LOG.warn("The storage directory " + rootPath + " is not writable.");
+ LOG.warn("The storage directory {} is not writable.", rootPath);
return StorageState.NON_EXISTENT;
}
} catch(SecurityException ex) {
@@ -158,9 +158,9 @@ class RaftStorageDirectoryImpl implements
RaftStorageDirectory {
// check enough space
if (!hasEnoughSpace()) {
- LOG.warn("There are not enough space left for directory " + rootPath
- + " free space min required: " + freeSpaceMin
- + " free space actual: " + root.getFreeSpace());
+ LOG.warn("There are not enough space left for directory {}"
+ + " free space min required: {} free space actual: {}",
+ rootPath, freeSpaceMin, root.getFreeSpace());
return StorageState.NO_SPACE;
}
@@ -225,11 +225,11 @@ class RaftStorageDirectoryImpl implements
RaftStorageDirectory {
try {
res = file.getChannel().tryLock();
if (null == res) {
- LOG.error("Unable to acquire file lock on path " + lockF.toString());
+ LOG.error("Unable to acquire file lock on path {}", lockF);
throw new OverlappingFileLockException();
}
file.write(JVM_NAME.getBytes(StandardCharsets.UTF_8));
- LOG.info("Lock on " + lockF + " acquired by nodename " + JVM_NAME);
+ LOG.info("Lock on {} acquired by nodename {}", lockF, JVM_NAME);
} catch (OverlappingFileLockException oe) {
// Cannot read from the locked file on Windows.
LOG.error("It appears that another process "
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageImpl.java
index 56972c3f7..4fcf46389 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageImpl.java
@@ -17,6 +17,7 @@
*/
package org.apache.ratis.server.storage;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.server.RaftConfiguration;
import org.apache.ratis.server.RaftServerConfigKeys.Log.CorruptionPolicy;
@@ -39,7 +40,7 @@ public class RaftStorageImpl implements RaftStorage {
private final StartupOption startupOption;
private final CorruptionPolicy logCorruptionPolicy;
private volatile StorageState state = StorageState.UNINITIALIZED;
- private volatile RaftStorageMetadataFileImpl metaFile;
+ private final MetaFile metaFile = new MetaFile();
RaftStorageImpl(File dir, SizeInBytes freeSpaceMin, StartupOption option,
CorruptionPolicy logCorruptionPolicy) {
LOG.debug("newRaftStorage: {}, freeSpaceMin={}, option={},
logCorruptionPolicy={}",
@@ -60,7 +61,8 @@ public class RaftStorageImpl implements RaftStorage {
format();
state = storageDir.analyzeStorage(false);
} else {
- state = analyzeAndRecoverStorage(true); // metaFile is initialized here
+ // metaFile is initialized here
+ state = analyzeAndRecoverStorage(true);
}
} catch (Throwable t) {
unlockOnFailure(storageDir);
@@ -91,9 +93,8 @@ public class RaftStorageImpl implements RaftStorage {
private void format() throws IOException {
storageDir.clearDirectory();
- metaFile = new RaftStorageMetadataFileImpl(storageDir.getMetaFile());
- metaFile.persist(RaftStorageMetadata.getDefault());
- LOG.info("Storage directory " + storageDir.getRoot() + " has been
successfully formatted.");
+
metaFile.set(storageDir.getMetaFile()).persist(RaftStorageMetadata.getDefault());
+ LOG.info("Storage directory {} has been successfully formatted.",
storageDir.getRoot());
}
private void cleanMetaTmpFile() throws IOException {
@@ -112,8 +113,7 @@ public class RaftStorageImpl implements RaftStorage {
if (!f.exists()) {
throw new FileNotFoundException("Metadata file " + f + " does not
exists.");
}
- metaFile = new RaftStorageMetadataFileImpl(f);
- final RaftStorageMetadata metadata = metaFile.getMetadata();
+ final RaftStorageMetadata metadata = metaFile.set(f).getMetadata();
LOG.info("Read {} from {}", metadata, f);
return StorageState.NORMAL;
} else if (storageState == StorageState.NOT_FORMATTED &&
@@ -137,7 +137,7 @@ public class RaftStorageImpl implements RaftStorage {
@Override
public RaftStorageMetadataFile getMetadataFile() {
- return metaFile;
+ return metaFile.get();
}
public void writeRaftConfiguration(LogEntryProto conf) {
@@ -166,4 +166,18 @@ public class RaftStorageImpl implements RaftStorage {
public String toString() {
return JavaUtils.getClassSimpleName(getClass()) + ":" + getStorageDir();
}
+
+ static class MetaFile {
+ private final AtomicReference<RaftStorageMetadataFileImpl> ref = new
AtomicReference<>();
+
+ RaftStorageMetadataFile get() {
+ return ref.get();
+ }
+
+ RaftStorageMetadataFile set(File file) {
+ final RaftStorageMetadataFileImpl impl = new
RaftStorageMetadataFileImpl(file);
+ ref.set(impl);
+ return impl;
+ }
+ }
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java
b/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java
index ab276bdcf..8d291acb2 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java
@@ -61,7 +61,7 @@ public class SnapshotManager {
private final RaftPeerId selfId;
private final Supplier<File> snapshotDir;
- private final Supplier<File> tmp;
+ private final Supplier<File> snapshotTmpDir;
private final Function<FileChunkProto, String> getRelativePath;
private final Supplier<MessageDigest> digester =
JavaUtils.memoize(MD5Hash::getDigester);
@@ -69,7 +69,7 @@ public class SnapshotManager {
this.selfId = selfId;
this.snapshotDir = MemoizedSupplier.valueOf(
() -> Optional.ofNullable(smStorage.getSnapshotDir()).orElseGet(() ->
dir.get().getStateMachineDir()));
- this.tmp = MemoizedSupplier.valueOf(
+ this.snapshotTmpDir = MemoizedSupplier.valueOf(
() -> Optional.ofNullable(smStorage.getTmpDir()).orElseGet(() ->
dir.get().getTmpDir()));
final Supplier<Path> smDir = MemoizedSupplier.valueOf(() ->
dir.get().getStateMachineDir().toPath());
@@ -82,7 +82,7 @@ public class SnapshotManager {
final long lastIncludedIndex =
snapshotChunkRequest.getTermIndex().getIndex();
// create a unique temporary directory
- final File tmpDir = new File(tmp.get(), "snapshot-" +
snapshotChunkRequest.getRequestId());
+ final File tmpDir = new File(this.snapshotTmpDir.get(), "snapshot-" +
snapshotChunkRequest.getRequestId());
FileUtils.createDirectories(tmpDir);
tmpDir.deleteOnExit();
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/storage/StorageImplUtils.java
b/ratis-server/src/main/java/org/apache/ratis/server/storage/StorageImplUtils.java
index bf10141e3..b6199ea02 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/storage/StorageImplUtils.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/storage/StorageImplUtils.java
@@ -50,6 +50,7 @@ public final class StorageImplUtils {
}
/** Create a {@link RaftStorageImpl}. */
+ @SuppressWarnings("java:S2095") // return Closable
public static RaftStorageImpl newRaftStorage(File dir, SizeInBytes
freeSpaceMin,
RaftStorage.StartupOption option, Log.CorruptionPolicy
logCorruptionPolicy) {
return new RaftStorageImpl(dir, freeSpaceMin, option, logCorruptionPolicy);
@@ -130,13 +131,14 @@ public final class StorageImplUtils {
}
}
+ @SuppressWarnings("java:S1181") // catch Throwable
private RaftStorageImpl format() throws IOException {
if (!existingSubs.isEmpty()) {
throw new IOException("Failed to " + option + ": One or more existing
directories found " + existingSubs
+ " for " + storageDirName);
}
- for (; !dirsPerVol.isEmpty(); ) {
+ while (!dirsPerVol.isEmpty()) {
final File vol = chooseMin(dirsPerVol);
final File dir = new File(vol, storageDirName);
try {
@@ -151,6 +153,7 @@ public final class StorageImplUtils {
throw new IOException("Failed to FORMAT a new storage dir for " +
storageDirName + " from " + dirsInConf);
}
+ @SuppressWarnings("java:S1181") // catch Throwable
private RaftStorageImpl recover() throws IOException {
final int size = existingSubs.size();
if (size > 1) {
@@ -166,10 +169,9 @@ public final class StorageImplUtils {
final RaftStorageImpl storage = newRaftStorage(dir, freeSpaceMin,
StartupOption.RECOVER, logCorruptionPolicy);
storage.initialize();
return storage;
+ } catch (IOException e) {
+ throw e;
} catch (Throwable e) {
- if (e instanceof IOException) {
- throw e;
- }
throw new IOException("Failed to initialize the existing directory " +
dir.getAbsolutePath(), e);
}
}