This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch native_raft
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/native_raft by this push:
     new 84fa686e08 apply spotless
84fa686e08 is described below

commit 84fa686e08a6ecb740f0e64f041ca4b7bd03cae3
Author: Tian Jiang <[email protected]>
AuthorDate: Mon May 8 09:06:40 2023 +0800

    apply spotless
---
 .../serialization/SyncLogDequeSerializer.java      | 113 ++++++++-------------
 1 file changed, 44 insertions(+), 69 deletions(-)

diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
index 59947f3ff7..f8a5f3956e 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
@@ -18,10 +18,6 @@
  */
 package org.apache.iotdb.consensus.natraft.protocol.log.manager.serialization;
 
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.file.SystemFileFactory;
@@ -60,6 +56,10 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.Date;
 import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -75,14 +75,10 @@ public class SyncLogDequeSerializer implements 
StableEntryManager {
   private static final String LOG_DATA_FILE_SUFFIX = "data";
   private static final String LOG_INDEX_FILE_SUFFIX = "idx";
 
-  /**
-   * the log data files
-   */
+  /** the log data files */
   private List<File> logDataFileList;
 
-  /**
-   * the log index files
-   */
+  /** the log index files */
   private List<IndexFileDescriptor> logIndexFileList;
 
   private LogParser parser = LogParser.getINSTANCE();
@@ -93,14 +89,10 @@ public class SyncLogDequeSerializer implements 
StableEntryManager {
   private HardState state;
   private String name;
 
-  /**
-   * min version of available log
-   */
+  /** min version of available log */
   private long minAvailableVersion = 0;
 
-  /**
-   * max version of available log
-   */
+  /** max version of available log */
   private long maxAvailableVersion = Long.MAX_VALUE;
 
   private String logDir;
@@ -160,9 +152,7 @@ public class SyncLogDequeSerializer implements 
StableEntryManager {
 
   private static final int LOG_DELETE_CHECK_INTERVAL_SECOND = 5;
 
-  /**
-   * the lock uses when change the log data files or log index files
-   */
+  /** the lock uses when change the log data files or log index files */
   private final Lock lock = new ReentrantLock();
 
   private volatile boolean isClosed = false;
@@ -200,8 +190,8 @@ public class SyncLogDequeSerializer implements 
StableEntryManager {
                 .namingPattern("persist-log-delete-" + logDir)
                 .daemon(true)
                 .build());
-    this.flushingLogExecutorService = Executors.newSingleThreadExecutor(
-        (r) -> new Thread(r, name + "-flushRaftLog"));
+    this.flushingLogExecutorService =
+        Executors.newSingleThreadExecutor((r) -> new Thread(r, name + 
"-flushRaftLog"));
 
     this.persistLogDeleteLogFuture =
         ScheduledExecutorUtil.safelyScheduleAtFixedRate(
@@ -230,9 +220,7 @@ public class SyncLogDequeSerializer implements 
StableEntryManager {
     return systemDir + File.separator + groupId + File.separator + "raftLog" + 
File.separator;
   }
 
-  /**
-   * for log tools
-   */
+  /** for log tools */
   @Override
   public LogManagerMeta getMeta() {
     return meta;
@@ -244,9 +232,7 @@ public class SyncLogDequeSerializer implements 
StableEntryManager {
     meta.setLastAppliedIndex(applyIndex);
   }
 
-  /**
-   * Recover all the logs in disk. This function will be called once this 
instance is created.
-   */
+  /** Recover all the logs in disk. This function will be called once this 
instance is created. */
   @Override
   public List<Entry> getAllEntriesAfterAppliedIndex() {
     logger.debug(
@@ -260,8 +246,8 @@ public class SyncLogDequeSerializer implements 
StableEntryManager {
    * is uncommitted for persistent LogManagerMeta(meta's info is stale).We 
need to recover these
    * already persistent logs.
    *
-   * <p>For example,commitIndex is 5 in persistent LogManagerMeta,But the log 
file has actually
-   * been flushed to 7,when we restart cluster,we need to recover 6 and 7.
+   * <p>For example,commitIndex is 5 in persistent LogManagerMeta,But the log 
file has actually been
+   * flushed to 7,when we restart cluster,we need to recover 6 and 7.
    *
    * <p>Maybe,we can extract getAllEntriesAfterAppliedIndex and 
getAllEntriesAfterCommittedIndex
    * into getAllEntriesByIndex,but now there are too many test cases using it.
@@ -324,26 +310,25 @@ public class SyncLogDequeSerializer implements 
StableEntryManager {
         logDataBuffer.reset();
         logIndexBuffer.reset();
         flushLogBuffer();
-        checkCloseCurrentFile(log.getCurrLogIndex() - 1);
         logDataBuffer.put(logData);
         lastLogIndex = log.getCurrLogIndex();
       }
     }
   }
 
-  private void checkCloseCurrentFile(long commitIndex) {
+  private void checkCloseCurrentFile(long fileEndIndex) {
     if (offsetOfTheCurrentLogDataOutputStream > 
maxRaftLogPersistDataSizePerFile) {
       try {
-        closeCurrentFile(commitIndex);
+        closeCurrentFile(fileEndIndex);
         serializeMeta(meta);
-        createNewLogFile(logDir, commitIndex + 1);
+        createNewLogFile(logDir, fileEndIndex + 1);
       } catch (IOException e) {
         logger.error("check close current file failed", e);
       }
     }
   }
 
-  private void closeCurrentFile(long commitIndex) throws IOException {
+  private void closeCurrentFile(long fileEndIndex) throws IOException {
     if (currentLogDataOutputStream != null) {
       currentLogDataOutputStream.close();
       logger.info("{}: Closed a log data file {}", this, 
getCurrentLogDataFile());
@@ -353,7 +338,7 @@ public class SyncLogDequeSerializer implements 
StableEntryManager {
       String newDataFileName =
           currentLogDataFile
               .getName()
-              .replaceAll(String.valueOf(Long.MAX_VALUE), 
String.valueOf(commitIndex));
+              .replaceAll(String.valueOf(Long.MAX_VALUE), 
String.valueOf(fileEndIndex));
       File newCurrentLogDatFile =
           SystemFileFactory.INSTANCE.getFile(
               currentLogDataFile.getParent() + File.separator + 
newDataFileName);
@@ -381,7 +366,7 @@ public class SyncLogDequeSerializer implements 
StableEntryManager {
           currentLogIndexFile
               .file
               .getName()
-              .replaceAll(String.valueOf(Long.MAX_VALUE), 
String.valueOf(commitIndex));
+              .replaceAll(String.valueOf(Long.MAX_VALUE), 
String.valueOf(fileEndIndex));
       File newCurrentLogIndexFile =
           SystemFileFactory.INSTANCE.getFile(
               currentLogIndexFile.file.getParent() + File.separator + 
newIndexFileName);
@@ -394,7 +379,7 @@ public class SyncLogDequeSerializer implements 
StableEntryManager {
           newCurrentLogIndexFile.getAbsoluteFile());
 
       logIndexFileList.get(logIndexFileList.size() - 1).file = 
newCurrentLogIndexFile;
-      logIndexFileList.get(logIndexFileList.size() - 1).endIndex = commitIndex;
+      logIndexFileList.get(logIndexFileList.size() - 1).endIndex = 
fileEndIndex;
     }
 
     offsetOfTheCurrentLogDataOutputStream = 0;
@@ -434,8 +419,8 @@ public class SyncLogDequeSerializer implements 
StableEntryManager {
     try {
       checkStream();
       // 1. write to the log data file
-      byte[] compressed = compressor.compress(flushingLogDataBuffer.array(), 0,
-          flushingLogDataBuffer.position());
+      byte[] compressed =
+          compressor.compress(flushingLogDataBuffer.array(), 0, 
flushingLogDataBuffer.position());
       ReadWriteIOUtils.write(compressed.length, currentLogDataOutputStream);
       logIndexOffsetList.add(new Pair<>(lastLogIndex, 
offsetOfTheCurrentLogDataOutputStream));
       flushingLogIndexBuffer.putLong(lastLogIndex);
@@ -450,6 +435,8 @@ public class SyncLogDequeSerializer implements 
StableEntryManager {
         currentLogIndexOutputStream.getChannel().force(true);
       }
       persistedLogIndex = currentLastIndex;
+
+      checkCloseCurrentFile();
     } catch (IOException e) {
       logger.error("Error in logs serialization: ", e);
       return;
@@ -485,9 +472,7 @@ public class SyncLogDequeSerializer implements 
StableEntryManager {
     }
   }
 
-  /**
-   * flush the log buffer and check if the file needs to be closed
-   */
+  /** flush the log buffer and check if the file needs to be closed */
   @Override
   public void forceFlushLogBuffer() {
     lock.lock();
@@ -531,9 +516,7 @@ public class SyncLogDequeSerializer implements 
StableEntryManager {
     }
   }
 
-  /**
-   * The file name rules are as follows: 
${startLogIndex}-${endLogIndex}-${version}.data
-   */
+  /** The file name rules are as follows: 
${startLogIndex}-${endLogIndex}-${version}.data */
   private void recoverLogFiles() {
     // 1. first we should recover the log index file
     recoverLogFiles(LOG_INDEX_FILE_SUFFIX);
@@ -580,9 +563,9 @@ public class SyncLogDequeSerializer implements 
StableEntryManager {
   /**
    * Check that the file is legal or not
    *
-   * @param file     file needs to be check
-   * @param fileType {@link SyncLogDequeSerializer#LOG_DATA_FILE_SUFFIX} or
-   *                 {@link SyncLogDequeSerializer#LOG_INDEX_FILE_SUFFIX}
+   * @param file file needs to be check
+   * @param fileType {@link SyncLogDequeSerializer#LOG_DATA_FILE_SUFFIX} or 
{@link
+   *     SyncLogDequeSerializer#LOG_INDEX_FILE_SUFFIX}
    * @return true if the file legal otherwise false
    */
   private boolean checkLogFile(File file, String fileType) {
@@ -810,9 +793,7 @@ public class SyncLogDequeSerializer implements 
StableEntryManager {
     }
   }
 
-  /**
-   * for unclosed file, the file name is 
${startIndex}-${Long.MAX_VALUE}-{version}
-   */
+  /** for unclosed file, the file name is 
${startIndex}-${Long.MAX_VALUE}-{version} */
   private void createNewLogFile(String dirName, long startLogIndex) throws 
IOException {
     lock.lock();
     try {
@@ -1163,7 +1144,7 @@ public class SyncLogDequeSerializer implements 
StableEntryManager {
 
   /**
    * @param startIndex the log start index
-   * @param endIndex   the log end index
+   * @param endIndex the log end index
    * @return the raft log which index between [startIndex, endIndex] or empty 
if not found
    */
   @Override
@@ -1268,9 +1249,9 @@ public class SyncLogDequeSerializer implements 
StableEntryManager {
 
   /**
    * @param startIndex the log start index
-   * @param endIndex   the log end index
+   * @param endIndex the log end index
    * @return first value-> the log data file, second value-> the left value is 
the start offset of
-   * the file, the right is the end offset of the file
+   *     the file, the right is the end offset of the file
    */
   private List<Pair<File, Pair<Long, Long>>> getLogDataFileAndOffset(
       long startIndex, long endIndex) {
@@ -1337,8 +1318,8 @@ public class SyncLogDequeSerializer implements 
StableEntryManager {
   /**
    * @param startIndex the start log index
    * @return the first value of the pair is the log index file which contains 
the start index; the
-   * second pair's first value is the file's start log index. the second 
pair's second value is the
-   * file's end log index. null if not found
+   *     second pair's first value is the file's start log index. the second 
pair's second value is
+   *     the file's end log index. null if not found
    */
   public IndexFileDescriptor getLogIndexFile(long startIndex) {
     for (IndexFileDescriptor descriptor : logIndexFileList) {
@@ -1353,8 +1334,8 @@ public class SyncLogDequeSerializer implements 
StableEntryManager {
   /**
    * @param startIndex the start log index
    * @return the first value of the pair is the log data file which contains 
the start index; the
-   * second pair's first value is the file's start log index. the second 
pair's second value is the
-   * file's end log index. null if not found
+   *     second pair's first value is the file's start log index. the second 
pair's second value is
+   *     the file's end log index. null if not found
    */
   public Pair<File, Pair<Long, Long>> getLogDataFile(long startIndex) {
     for (File file : logDataFileList) {
@@ -1373,9 +1354,9 @@ public class SyncLogDequeSerializer implements 
StableEntryManager {
   }
 
   /**
-   * @param file              the log data file
+   * @param file the log data file
    * @param startAndEndOffset the left value is the start offset of the file, 
the right is the end
-   *                          offset of the file
+   *     offset of the file
    * @return the logs between start offset and end offset
    */
   private List<Entry> getLogsFromOneLogDataFile(File file, Pair<Long, Long> 
startAndEndOffset) {
@@ -1473,9 +1454,7 @@ public class SyncLogDequeSerializer implements 
StableEntryManager {
        * automatically increased by saveInterval to avoid conflicts.
        */
       private static long saveInterval = 100;
-      /**
-       * time partition id to dividing time series into different storage group
-       */
+      /** time partition id to dividing time series into different storage 
group */
       private long timePartitionId;
 
       private long prevVersion;
@@ -1489,9 +1468,7 @@ public class SyncLogDequeSerializer implements 
StableEntryManager {
         restore();
       }
 
-      /**
-       * only used for upgrading
-       */
+      /** only used for upgrading */
       public SimpleFileVersionController(String directoryPath) throws 
IOException {
         this.directoryPath = directoryPath + File.separator + UPGRADE_DIR;
         restore();
@@ -1554,9 +1531,7 @@ public class SyncLogDequeSerializer implements 
StableEntryManager {
         prevVersion = currVersion;
       }
 
-      /**
-       * recovery from disk
-       */
+      /** recovery from disk */
       @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity 
warning
       private void restore() throws IOException {
         File directory = SystemFileFactory.INSTANCE.getFile(directoryPath);

Reply via email to