This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch native_raft
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/native_raft by this push:
new 84fa686e08 apply spotless
84fa686e08 is described below
commit 84fa686e08a6ecb740f0e64f041ca4b7bd03cae3
Author: Tian Jiang <[email protected]>
AuthorDate: Mon May 8 09:06:40 2023 +0800
apply spotless
---
.../serialization/SyncLogDequeSerializer.java | 113 ++++++++-------------
1 file changed, 44 insertions(+), 69 deletions(-)
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
index 59947f3ff7..f8a5f3956e 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
@@ -18,10 +18,6 @@
*/
package org.apache.iotdb.consensus.natraft.protocol.log.manager.serialization;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.file.SystemFileFactory;
@@ -60,6 +56,10 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -75,14 +75,10 @@ public class SyncLogDequeSerializer implements
StableEntryManager {
private static final String LOG_DATA_FILE_SUFFIX = "data";
private static final String LOG_INDEX_FILE_SUFFIX = "idx";
- /**
- * the log data files
- */
+ /** the log data files */
private List<File> logDataFileList;
- /**
- * the log index files
- */
+ /** the log index files */
private List<IndexFileDescriptor> logIndexFileList;
private LogParser parser = LogParser.getINSTANCE();
@@ -93,14 +89,10 @@ public class SyncLogDequeSerializer implements
StableEntryManager {
private HardState state;
private String name;
- /**
- * min version of available log
- */
+ /** min version of available log */
private long minAvailableVersion = 0;
- /**
- * max version of available log
- */
+ /** max version of available log */
private long maxAvailableVersion = Long.MAX_VALUE;
private String logDir;
@@ -160,9 +152,7 @@ public class SyncLogDequeSerializer implements
StableEntryManager {
private static final int LOG_DELETE_CHECK_INTERVAL_SECOND = 5;
- /**
- * the lock uses when change the log data files or log index files
- */
+ /** the lock uses when change the log data files or log index files */
private final Lock lock = new ReentrantLock();
private volatile boolean isClosed = false;
@@ -200,8 +190,8 @@ public class SyncLogDequeSerializer implements
StableEntryManager {
.namingPattern("persist-log-delete-" + logDir)
.daemon(true)
.build());
- this.flushingLogExecutorService = Executors.newSingleThreadExecutor(
- (r) -> new Thread(r, name + "-flushRaftLog"));
+ this.flushingLogExecutorService =
+ Executors.newSingleThreadExecutor((r) -> new Thread(r, name +
"-flushRaftLog"));
this.persistLogDeleteLogFuture =
ScheduledExecutorUtil.safelyScheduleAtFixedRate(
@@ -230,9 +220,7 @@ public class SyncLogDequeSerializer implements
StableEntryManager {
return systemDir + File.separator + groupId + File.separator + "raftLog" +
File.separator;
}
- /**
- * for log tools
- */
+ /** for log tools */
@Override
public LogManagerMeta getMeta() {
return meta;
@@ -244,9 +232,7 @@ public class SyncLogDequeSerializer implements
StableEntryManager {
meta.setLastAppliedIndex(applyIndex);
}
- /**
- * Recover all the logs in disk. This function will be called once this
instance is created.
- */
+ /** Recover all the logs in disk. This function will be called once this
instance is created. */
@Override
public List<Entry> getAllEntriesAfterAppliedIndex() {
logger.debug(
@@ -260,8 +246,8 @@ public class SyncLogDequeSerializer implements
StableEntryManager {
* is uncommitted for persistent LogManagerMeta(meta's info is stale).We
need to recover these
* already persistent logs.
*
- * <p>For example,commitIndex is 5 in persistent LogManagerMeta,But the log
file has actually
- * been flushed to 7,when we restart cluster,we need to recover 6 and 7.
+ * <p>For example,commitIndex is 5 in persistent LogManagerMeta,But the log
file has actually been
+ * flushed to 7,when we restart cluster,we need to recover 6 and 7.
*
* <p>Maybe,we can extract getAllEntriesAfterAppliedIndex and
getAllEntriesAfterCommittedIndex
* into getAllEntriesByIndex,but now there are too many test cases using it.
@@ -324,26 +310,25 @@ public class SyncLogDequeSerializer implements
StableEntryManager {
logDataBuffer.reset();
logIndexBuffer.reset();
flushLogBuffer();
- checkCloseCurrentFile(log.getCurrLogIndex() - 1);
logDataBuffer.put(logData);
lastLogIndex = log.getCurrLogIndex();
}
}
}
- private void checkCloseCurrentFile(long commitIndex) {
+ private void checkCloseCurrentFile(long fileEndIndex) {
if (offsetOfTheCurrentLogDataOutputStream >
maxRaftLogPersistDataSizePerFile) {
try {
- closeCurrentFile(commitIndex);
+ closeCurrentFile(fileEndIndex);
serializeMeta(meta);
- createNewLogFile(logDir, commitIndex + 1);
+ createNewLogFile(logDir, fileEndIndex + 1);
} catch (IOException e) {
logger.error("check close current file failed", e);
}
}
}
- private void closeCurrentFile(long commitIndex) throws IOException {
+ private void closeCurrentFile(long fileEndIndex) throws IOException {
if (currentLogDataOutputStream != null) {
currentLogDataOutputStream.close();
logger.info("{}: Closed a log data file {}", this,
getCurrentLogDataFile());
@@ -353,7 +338,7 @@ public class SyncLogDequeSerializer implements
StableEntryManager {
String newDataFileName =
currentLogDataFile
.getName()
- .replaceAll(String.valueOf(Long.MAX_VALUE),
String.valueOf(commitIndex));
+ .replaceAll(String.valueOf(Long.MAX_VALUE),
String.valueOf(fileEndIndex));
File newCurrentLogDatFile =
SystemFileFactory.INSTANCE.getFile(
currentLogDataFile.getParent() + File.separator +
newDataFileName);
@@ -381,7 +366,7 @@ public class SyncLogDequeSerializer implements
StableEntryManager {
currentLogIndexFile
.file
.getName()
- .replaceAll(String.valueOf(Long.MAX_VALUE),
String.valueOf(commitIndex));
+ .replaceAll(String.valueOf(Long.MAX_VALUE),
String.valueOf(fileEndIndex));
File newCurrentLogIndexFile =
SystemFileFactory.INSTANCE.getFile(
currentLogIndexFile.file.getParent() + File.separator +
newIndexFileName);
@@ -394,7 +379,7 @@ public class SyncLogDequeSerializer implements
StableEntryManager {
newCurrentLogIndexFile.getAbsoluteFile());
logIndexFileList.get(logIndexFileList.size() - 1).file =
newCurrentLogIndexFile;
- logIndexFileList.get(logIndexFileList.size() - 1).endIndex = commitIndex;
+ logIndexFileList.get(logIndexFileList.size() - 1).endIndex =
fileEndIndex;
}
offsetOfTheCurrentLogDataOutputStream = 0;
@@ -434,8 +419,8 @@ public class SyncLogDequeSerializer implements
StableEntryManager {
try {
checkStream();
// 1. write to the log data file
- byte[] compressed = compressor.compress(flushingLogDataBuffer.array(), 0,
- flushingLogDataBuffer.position());
+ byte[] compressed =
+ compressor.compress(flushingLogDataBuffer.array(), 0,
flushingLogDataBuffer.position());
ReadWriteIOUtils.write(compressed.length, currentLogDataOutputStream);
logIndexOffsetList.add(new Pair<>(lastLogIndex,
offsetOfTheCurrentLogDataOutputStream));
flushingLogIndexBuffer.putLong(lastLogIndex);
@@ -450,6 +435,8 @@ public class SyncLogDequeSerializer implements
StableEntryManager {
currentLogIndexOutputStream.getChannel().force(true);
}
persistedLogIndex = currentLastIndex;
+
+ checkCloseCurrentFile();
} catch (IOException e) {
logger.error("Error in logs serialization: ", e);
return;
@@ -485,9 +472,7 @@ public class SyncLogDequeSerializer implements
StableEntryManager {
}
}
- /**
- * flush the log buffer and check if the file needs to be closed
- */
+ /** flush the log buffer and check if the file needs to be closed */
@Override
public void forceFlushLogBuffer() {
lock.lock();
@@ -531,9 +516,7 @@ public class SyncLogDequeSerializer implements
StableEntryManager {
}
}
- /**
- * The file name rules are as follows:
${startLogIndex}-${endLogIndex}-${version}.data
- */
+ /** The file name rules are as follows:
${startLogIndex}-${endLogIndex}-${version}.data */
private void recoverLogFiles() {
// 1. first we should recover the log index file
recoverLogFiles(LOG_INDEX_FILE_SUFFIX);
@@ -580,9 +563,9 @@ public class SyncLogDequeSerializer implements
StableEntryManager {
/**
* Check that the file is legal or not
*
- * @param file file needs to be check
- * @param fileType {@link SyncLogDequeSerializer#LOG_DATA_FILE_SUFFIX} or
- * {@link SyncLogDequeSerializer#LOG_INDEX_FILE_SUFFIX}
+ * @param file file needs to be check
+ * @param fileType {@link SyncLogDequeSerializer#LOG_DATA_FILE_SUFFIX} or
{@link
+ * SyncLogDequeSerializer#LOG_INDEX_FILE_SUFFIX}
* @return true if the file legal otherwise false
*/
private boolean checkLogFile(File file, String fileType) {
@@ -810,9 +793,7 @@ public class SyncLogDequeSerializer implements
StableEntryManager {
}
}
- /**
- * for unclosed file, the file name is
${startIndex}-${Long.MAX_VALUE}-{version}
- */
+ /** for unclosed file, the file name is
${startIndex}-${Long.MAX_VALUE}-{version} */
private void createNewLogFile(String dirName, long startLogIndex) throws
IOException {
lock.lock();
try {
@@ -1163,7 +1144,7 @@ public class SyncLogDequeSerializer implements
StableEntryManager {
/**
* @param startIndex the log start index
- * @param endIndex the log end index
+ * @param endIndex the log end index
* @return the raft log which index between [startIndex, endIndex] or empty
if not found
*/
@Override
@@ -1268,9 +1249,9 @@ public class SyncLogDequeSerializer implements
StableEntryManager {
/**
* @param startIndex the log start index
- * @param endIndex the log end index
+ * @param endIndex the log end index
* @return first value-> the log data file, second value-> the left value is
the start offset of
- * the file, the right is the end offset of the file
+ * the file, the right is the end offset of the file
*/
private List<Pair<File, Pair<Long, Long>>> getLogDataFileAndOffset(
long startIndex, long endIndex) {
@@ -1337,8 +1318,8 @@ public class SyncLogDequeSerializer implements
StableEntryManager {
/**
* @param startIndex the start log index
* @return the first value of the pair is the log index file which contains
the start index; the
- * second pair's first value is the file's start log index. the second
pair's second value is the
- * file's end log index. null if not found
+ * second pair's first value is the file's start log index. the second
pair's second value is
+ * the file's end log index. null if not found
*/
public IndexFileDescriptor getLogIndexFile(long startIndex) {
for (IndexFileDescriptor descriptor : logIndexFileList) {
@@ -1353,8 +1334,8 @@ public class SyncLogDequeSerializer implements
StableEntryManager {
/**
* @param startIndex the start log index
* @return the first value of the pair is the log data file which contains
the start index; the
- * second pair's first value is the file's start log index. the second
pair's second value is the
- * file's end log index. null if not found
+ * second pair's first value is the file's start log index. the second
pair's second value is
+ * the file's end log index. null if not found
*/
public Pair<File, Pair<Long, Long>> getLogDataFile(long startIndex) {
for (File file : logDataFileList) {
@@ -1373,9 +1354,9 @@ public class SyncLogDequeSerializer implements
StableEntryManager {
}
/**
- * @param file the log data file
+ * @param file the log data file
* @param startAndEndOffset the left value is the start offset of the file,
the right is the end
- * offset of the file
+ * offset of the file
* @return the logs between start offset and end offset
*/
private List<Entry> getLogsFromOneLogDataFile(File file, Pair<Long, Long>
startAndEndOffset) {
@@ -1473,9 +1454,7 @@ public class SyncLogDequeSerializer implements
StableEntryManager {
* automatically increased by saveInterval to avoid conflicts.
*/
private static long saveInterval = 100;
- /**
- * time partition id to dividing time series into different storage group
- */
+ /** time partition id to dividing time series into different storage
group */
private long timePartitionId;
private long prevVersion;
@@ -1489,9 +1468,7 @@ public class SyncLogDequeSerializer implements
StableEntryManager {
restore();
}
- /**
- * only used for upgrading
- */
+ /** only used for upgrading */
public SimpleFileVersionController(String directoryPath) throws
IOException {
this.directoryPath = directoryPath + File.separator + UPGRADE_DIR;
restore();
@@ -1554,9 +1531,7 @@ public class SyncLogDequeSerializer implements
StableEntryManager {
prevVersion = currVersion;
}
- /**
- * recovery from disk
- */
+ /** recovery from disk */
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity
warning
private void restore() throws IOException {
File directory = SystemFileFactory.INSTANCE.getFile(directoryPath);