This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 4875c24c566 [IOTDB-6087] Implement stream interface of Mods read
4875c24c566 is described below

commit 4875c24c5661a06a84e38cf4e54a8a43b996789a
Author: Weihao Li <[email protected]>
AuthorDate: Fri Jul 28 10:34:15 2023 +0800

    [IOTDB-6087] Implement stream interface of Mods read
---
 .../execution/fragment/QueryContext.java           |   2 +-
 .../db/storageengine/dataregion/DataRegion.java    | 111 +++--
 .../dataregion/modification/ModificationFile.java  |  59 +--
 .../io/LocalTextModificationAccessor.java          | 177 ++++++--
 .../modification/io/ModificationReader.java        |   9 +
 .../modification/io/ModificationWriter.java        |   7 +-
 .../modification/utils/TracedBufferedReader.java   | 463 ---------------------
 .../modification/ModificationFileTest.java         |  25 +-
 .../io/LocalTextModificationAccessorTest.java      |  96 +++--
 9 files changed, 289 insertions(+), 660 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java
index d927d90c529..60ee9baeaac 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java
@@ -95,7 +95,7 @@ public class QueryContext {
               fileModCache.get(modFile.getFilePath());
           if (allModifications == null) {
             allModifications = PatternTreeMapFactory.getModsPatternTreeMap();
-            for (Modification modification : modFile.getModifications()) {
+            for (Modification modification : modFile.getModificationsIter()) {
               allModifications.append(modification.getPath(), modification);
             }
             fileModCache.put(modFile.getFilePath(), allModifications);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 102392fae98..7ba71add820 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -1818,8 +1818,6 @@ public class DataRegion implements IDataRegionForQuery {
     // mod files in mergingModification, sequenceFileList, and 
unsequenceFileList
     writeLock("delete");
 
-    // record files which are updated so that we can roll back them in case of 
exception
-    List<ModificationFile> updatedModFiles = new ArrayList<>();
     boolean hasReleasedLock = false;
 
     try {
@@ -1855,30 +1853,14 @@ public class DataRegion implements IDataRegionForQuery {
       // deviceMatchInfo contains the DeviceId means this device matched the 
pattern
       Set<String> deviceMatchInfo = new HashSet<>();
       deleteDataInFiles(
-          unsealedTsFileResource,
-          deletion,
-          devicePaths,
-          updatedModFiles,
-          timePartitionFilter,
-          deviceMatchInfo);
+          unsealedTsFileResource, deletion, devicePaths, timePartitionFilter, 
deviceMatchInfo);
       writeUnlock();
       hasReleasedLock = true;
 
       deleteDataInFiles(
-          sealedTsFileResource,
-          deletion,
-          devicePaths,
-          updatedModFiles,
-          timePartitionFilter,
-          deviceMatchInfo);
+          sealedTsFileResource, deletion, devicePaths, timePartitionFilter, 
deviceMatchInfo);
 
     } catch (Exception e) {
-      // roll back
-      for (ModificationFile modFile : updatedModFiles) {
-        modFile.abort();
-        // remember to close mod file
-        modFile.close();
-      }
       throw new IOException(e);
     } finally {
       if (!hasReleasedLock) {
@@ -1985,11 +1967,12 @@ public class DataRegion implements IDataRegionForQuery {
     return true;
   }
 
+  // suppress warn of Throwable catch
+  @SuppressWarnings("java:S1181")
   private void deleteDataInFiles(
       Collection<TsFileResource> tsFileResourceList,
       Deletion deletion,
       Set<PartialPath> devicePaths,
-      List<ModificationFile> updatedModFiles,
       TimePartitionFilter timePartitionFilter,
       Set<String> deviceMatchInfo)
       throws IOException {
@@ -2006,52 +1989,60 @@ public class DataRegion implements IDataRegionForQuery {
 
       ModificationFile modFile = tsFileResource.getModFile();
       if (tsFileResource.isClosed()) {
-        // delete data in sealed file
-        if (tsFileResource.isCompacting()) {
-          // we have to set modification offset to MAX_VALUE, as the offset of 
source chunk may
-          // change after compaction
-          deletion.setFileOffset(Long.MAX_VALUE);
-          // write deletion into compaction modification file
-          tsFileResource.getCompactionModFile().write(deletion);
-          // write deletion into modification file to enable read during 
compaction
-          modFile.write(deletion);
-          // remember to close mod file
-          tsFileResource.getCompactionModFile().close();
-          modFile.close();
-        } else {
-          deletion.setFileOffset(tsFileResource.getTsFileSize());
-          // write deletion into modification file
-          boolean modFileExists = modFile.exists();
-          long originSize = modFile.getSize();
-          modFile.write(deletion);
-
-          // remember to close mod file
-          modFile.close();
-
-          // if file length greater than 1M,execute compact.
-          modFile.compact();
+        long originSize = -1;
+        synchronized (modFile) {
+          try {
+            originSize = modFile.getSize();
+            // delete data in sealed file
+            if (tsFileResource.isCompacting()) {
+              // we have to set modification offset to MAX_VALUE, as the 
offset of source chunk may
+              // change after compaction
+              deletion.setFileOffset(Long.MAX_VALUE);
+              // write deletion into compaction modification file
+              tsFileResource.getCompactionModFile().write(deletion);
+              // write deletion into modification file to enable read during 
compaction
+              modFile.write(deletion);
+              // remember to close mod file
+              tsFileResource.getCompactionModFile().close();
+              modFile.close();
+            } else {
+              deletion.setFileOffset(tsFileResource.getTsFileSize());
+              // write deletion into modification file
+              boolean modFileExists = modFile.exists();
+
+              modFile.write(deletion);
+
+              // remember to close mod file
+              modFile.close();
+
+              // if file length greater than 1M,execute compact.
+              modFile.compact();
+
+              if (!modFileExists) {
+                FileMetrics.getInstance().increaseModFileNum(1);
+              }
 
-          if (!modFileExists) {
-            FileMetrics.getInstance().increaseModFileNum(1);
+              // The file size may be smaller than the original file, so the 
increment here may be
+              // negative
+              FileMetrics.getInstance().increaseModFileSize(modFile.getSize() 
- originSize);
+            }
+          } catch (Throwable t) {
+            if (originSize != -1) {
+              modFile.truncate(originSize);
+            }
+            throw t;
           }
-
-          // The file size may be smaller than the original file, so the 
increment here may be
-          // negative
-          FileMetrics.getInstance().increaseModFileSize(modFile.getSize() - 
originSize);
+          logger.info(
+              "[Deletion] Deletion with path:{}, time:{}-{} written into mods 
file:{}.",
+              deletion.getPath(),
+              deletion.getStartTime(),
+              deletion.getEndTime(),
+              modFile.getFilePath());
         }
-        logger.info(
-            "[Deletion] Deletion with path:{}, time:{}-{} written into mods 
file:{}.",
-            deletion.getPath(),
-            deletion.getStartTime(),
-            deletion.getEndTime(),
-            modFile.getFilePath());
       } else {
         // delete data in memory of unsealed file
         tsFileResource.getProcessor().deleteDataInMemory(deletion, 
devicePaths);
       }
-
-      // add a record in case of rollback
-      updatedModFiles.add(modFile);
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFile.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFile.java
index 609975aec7c..6491f4c4eaa 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFile.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFile.java
@@ -29,6 +29,8 @@ import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.concurrent.GuardedBy;
+
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.FileAlreadyExistsException;
@@ -53,10 +55,11 @@ public class ModificationFile implements AutoCloseable {
   public static final String COMPACT_SUFFIX = ".settle";
   public static final String COMPACTION_FILE_SUFFIX = ".compaction.mods";
 
-  // lazy loaded, set null when closed
-  private List<Modification> modifications;
-  private ModificationWriter writer;
-  private ModificationReader reader;
+  // whether to verify the last line, it may be incomplete in extreme cases
+  private boolean needVerify = true;
+
+  private final ModificationWriter writer;
+  private final ModificationReader reader;
   private String filePath;
   private final SecureRandom random = new SecureRandom();
 
@@ -75,33 +78,11 @@ public class ModificationFile implements AutoCloseable {
     this.filePath = filePath;
   }
 
-  private void init() {
-    synchronized (this) {
-      modifications = (List<Modification>) reader.read();
-    }
-  }
-
-  private void checkInit() {
-    if (modifications == null) {
-      init();
-    }
-  }
-
   /** Release resources such as streams and caches. */
   @Override
   public void close() throws IOException {
     synchronized (this) {
       writer.close();
-      modifications = null;
-    }
-  }
-
-  public void abort() throws IOException {
-    synchronized (this) {
-      writer.abort();
-      if (modifications != null && !modifications.isEmpty()) {
-        modifications.remove(modifications.size() - 1);
-      }
     }
   }
 
@@ -114,13 +95,19 @@ public class ModificationFile implements AutoCloseable {
    */
   public void write(Modification mod) throws IOException {
     synchronized (this) {
-      writer.write(mod);
-      if (modifications != null) {
-        modifications.add(mod);
+      if (needVerify && new File(filePath).exists()) {
+        writer.mayTruncateLastLine();
+        needVerify = false;
       }
+      writer.write(mod);
     }
   }
 
+  @GuardedBy("TsFileResource-WriteLock")
+  public void truncate(long size) {
+    writer.truncate(size);
+  }
+
   /**
    * Get all modifications stored in this file.
    *
@@ -128,11 +115,14 @@ public class ModificationFile implements AutoCloseable {
    */
   public Collection<Modification> getModifications() {
     synchronized (this) {
-      checkInit();
-      return new ArrayList<>(modifications);
+      return reader.read();
     }
   }
 
+  public Iterable<Modification> getModificationsIter() {
+    return reader::getModificationIterator;
+  }
+
   public String getFilePath() {
     return filePath;
   }
@@ -228,7 +218,6 @@ public class ModificationFile implements AutoCloseable {
         Files.move(new File(newModsFileName).toPath(), new 
File(filePath).toPath());
         logger.info("{} settle successful", filePath);
 
-        updateModifications(allSettledModifications);
         if (getSize() > COMPACT_THRESHOLD) {
           logger.warn(
               "After the mod file is settled, the file size is still greater 
than 1M,the size of the file before settle is {},after settled the file size is 
{}",
@@ -276,10 +265,4 @@ public class ModificationFile implements AutoCloseable {
     }
     return result;
   }
-
-  public void updateModifications(List<Modification> modifications) {
-    synchronized (this) {
-      this.modifications = modifications;
-    }
-  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/io/LocalTextModificationAccessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/io/LocalTextModificationAccessor.java
index 6e94a24b120..242c6fd21e9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/io/LocalTextModificationAccessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/io/LocalTextModificationAccessor.java
@@ -21,24 +21,28 @@ package 
org.apache.iotdb.db.storageengine.dataregion.modification.io;
 
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
 import org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
-import 
org.apache.iotdb.db.storageengine.dataregion.modification.utils.TracedBufferedReader;
 import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.BufferedReader;
 import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.FileReader;
 import java.io.IOException;
+import java.io.RandomAccessFile;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
+import java.util.NoSuchElementException;
 
 /**
  * LocalTextModificationAccessor uses a file on local file system to store the 
modifications in text
@@ -49,9 +53,10 @@ public class LocalTextModificationAccessor
 
   private static final Logger logger = 
LoggerFactory.getLogger(LocalTextModificationAccessor.class);
   private static final String SEPARATOR = ",";
-  private static final String ABORT_MARK = "aborted";
+  private static final String NO_MODIFICATION_MSG =
+      "No modification has been written to this file[{}]";
 
-  private String filePath;
+  private final String filePath;
   private BufferedWriter writer;
 
   /**
@@ -65,44 +70,82 @@ public class LocalTextModificationAccessor
 
   @Override
   public Collection<Modification> read() {
+    List<Modification> result = new ArrayList<>();
+    Iterator<Modification> iterator = getModificationIterator();
+    while (iterator.hasNext()) {
+      result.add(iterator.next());
+    }
+    return result;
+  }
+
+  // we need to hold the reader for the Iterator, cannot use auto close or 
close in finally block
+  @SuppressWarnings("java:S2095")
+  @Override
+  public Iterator<Modification> getModificationIterator() {
     File file = FSFactoryProducer.getFSFactory().getFile(filePath);
-    if (!file.exists()) {
-      logger.debug("No modification has been written to this file");
-      return new ArrayList<>();
-    }
-
-    String line;
-    long truncatedSize = 0;
-    boolean crashed = false;
-    List<Modification> modificationList = new ArrayList<>();
-    try (TracedBufferedReader reader = new TracedBufferedReader(new 
FileReader(file))) {
-      while ((line = reader.readLine()) != null) {
-        if (line.equals(ABORT_MARK) && !modificationList.isEmpty()) {
-          modificationList.remove(modificationList.size() - 1);
-        } else {
-          modificationList.add(decodeModification(line));
+    final BufferedReader reader;
+    try {
+      reader = new BufferedReader(new FileReader(file));
+    } catch (FileNotFoundException e) {
+      logger.debug(NO_MODIFICATION_MSG, file);
+
+      // return empty iterator
+      return new Iterator<Modification>() {
+        @Override
+        public boolean hasNext() {
+          return false;
         }
-        truncatedSize = reader.position();
+
+        @Override
+        public Modification next() {
+          throw new NoSuchElementException();
+        }
+      };
+    }
+
+    final Modification[] cachedModification = new Modification[1];
+    return new Iterator<Modification>() {
+      @Override
+      public boolean hasNext() {
+        try {
+          if (cachedModification[0] == null) {
+            String line = reader.readLine();
+            if (line == null) {
+              reader.close();
+              return false;
+            } else {
+              return decodeModificationAndCache(reader, cachedModification, 
line);
+            }
+          }
+        } catch (IOException e) {
+          logger.warn("An error occurred when reading modifications", e);
+        }
+        return true;
       }
-    } catch (IOException e) {
-      crashed = true;
-      logger.error(
-          "An error occurred when reading modifications, and the remaining 
modifications will be truncated to size {}.",
-          truncatedSize,
-          e);
-    }
-
-    if (crashed) {
-      try (FileOutputStream outputStream = new FileOutputStream(file, true)) {
-        outputStream.getChannel().truncate(truncatedSize);
-      } catch (FileNotFoundException e) {
-        logger.debug("No modification has been written to this file");
-      } catch (IOException e) {
-        logger.error(
-            "An error occurred when truncating modifications to size {}.", 
truncatedSize, e);
+
+      @Override
+      public Modification next() {
+        if (cachedModification[0] == null) {
+          throw new NoSuchElementException();
+        }
+        Modification result = cachedModification[0];
+        cachedModification[0] = null;
+        return result;
       }
+    };
+  }
+
+  private boolean decodeModificationAndCache(
+      BufferedReader reader, Modification[] cachedModification, String line) 
throws IOException {
+    try {
+      cachedModification[0] = decodeModification(line);
+      return true;
+    } catch (IOException e) {
+      logger.warn("An error occurred when decode line-[{}] to modification", 
line);
+      cachedModification[0] = null;
+      reader.close();
+      return false;
     }
-    return modificationList;
   }
 
   @Override
@@ -114,23 +157,71 @@ public class LocalTextModificationAccessor
   }
 
   @Override
-  public void abort() throws IOException {
+  public void write(Modification mod) throws IOException {
     if (writer == null) {
       writer = FSFactoryProducer.getFSFactory().getBufferedWriter(filePath, 
true);
     }
-    writer.write(ABORT_MARK);
+    writer.write(encodeModification(mod));
     writer.newLine();
     writer.flush();
   }
 
-  @Override
-  public void write(Modification mod) throws IOException {
+  @TestOnly
+  public void writeInComplete(Modification mod) throws IOException {
     if (writer == null) {
       writer = FSFactoryProducer.getFSFactory().getBufferedWriter(filePath, 
true);
     }
-    writer.write(encodeModification(mod));
-    writer.newLine();
-    writer.flush();
+    String line = encodeModification(mod);
+    if (line != null) {
+      writer.write(line.substring(0, 2));
+    }
+  }
+
+  @TestOnly
+  public void writeMeetException(Modification mod) throws IOException {
+    if (writer == null) {
+      writer = FSFactoryProducer.getFSFactory().getBufferedWriter(filePath, 
true);
+    }
+    writeInComplete(mod);
+    throw new IOException();
+  }
+
+  @Override
+  public void truncate(long size) {
+    try (FileOutputStream outputStream =
+        new 
FileOutputStream(FSFactoryProducer.getFSFactory().getFile(filePath), true)) {
+      outputStream.getChannel().truncate(size);
+      logger.warn("The modifications[{}] will be truncated to size {}.", 
filePath, size);
+    } catch (FileNotFoundException e) {
+      logger.debug(NO_MODIFICATION_MSG, filePath);
+    } catch (IOException e) {
+      logger.error(
+          "An error occurred when truncating modifications[{}] to size {}.", 
filePath, size, e);
+    }
+  }
+
+  @Override
+  public void mayTruncateLastLine() {
+    try (RandomAccessFile file = new RandomAccessFile(filePath, "r")) {
+      long filePointer = file.length() - 1;
+      if (filePointer == 0) {
+        return;
+      }
+
+      file.seek(filePointer);
+      byte lastChar = file.readByte();
+      if (lastChar != '\n') {
+        while (filePointer > -1 && lastChar != '\n') {
+          file.seek(filePointer);
+          filePointer--;
+          lastChar = file.readByte();
+        }
+        logger.warn("The last line of Mods is incomplete, will be truncated");
+        truncate(filePointer + 2);
+      }
+    } catch (IOException e) {
+      logger.error("An error occurred when reading modifications", e);
+    }
   }
 
   private static String encodeModification(Modification mod) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/io/ModificationReader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/io/ModificationReader.java
index a10b9ca7161..2da82b13d34 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/io/ModificationReader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/io/ModificationReader.java
@@ -23,6 +23,7 @@ import 
org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
 
 import java.io.IOException;
 import java.util.Collection;
+import java.util.Iterator;
 
 /** ModificationReader reads all modifications from a persistent medium like 
file system. */
 public interface ModificationReader {
@@ -35,6 +36,14 @@ public interface ModificationReader {
    */
   Collection<Modification> read();
 
+  /**
+   * Get an iterator over this mod file, others keep consistence with {@link 
#read()}. Please ensure
+   * you have called hasNext() with return of {@code true} before calling 
next().
+   *
+   * @return the modification iterator.
+   */
+  Iterator<Modification> getModificationIterator();
+
   /** Release resources like streams. */
   void close() throws IOException;
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/io/ModificationWriter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/io/ModificationWriter.java
index b59decbcd21..d2213d7d8ca 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/io/ModificationWriter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/io/ModificationWriter.java
@@ -37,9 +37,10 @@ public interface ModificationWriter {
    */
   void write(Modification mod) throws IOException;
 
+  void truncate(long size);
+
+  void mayTruncateLastLine();
+
   /** Release resources like streams. */
   void close() throws IOException;
-
-  /** Abort last modification. Notice that after calling abort(), a fileWriter 
is opened. */
-  void abort() throws IOException;
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/utils/TracedBufferedReader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/utils/TracedBufferedReader.java
deleted file mode 100644
index 9394a0cfeff..00000000000
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/utils/TracedBufferedReader.java
+++ /dev/null
@@ -1,463 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.storageengine.dataregion.modification.utils;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.Reader;
-import java.io.UncheckedIOException;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-import java.util.Spliterator;
-import java.util.Spliterators;
-import java.util.stream.Stream;
-import java.util.stream.StreamSupport;
-
-/**
- * Copied from {@link java.io.BufferedReader}, trace the read position by 
modifying the fill()
- * method.
- */
-public class TracedBufferedReader extends Reader {
-  private Reader in;
-
-  private char[] cb;
-  private int nChars;
-  private int nextChar;
-
-  private static final int INVALIDATED = -2;
-  private static final int UNMARKED = -1;
-  private int markedChar = UNMARKED;
-  private int readAheadLimit = 0; /* Valid only when markedChar > 0 */
-
-  /** If the next character is a line feed, skip it. */
-  private boolean skipLF = false;
-
-  /** The skipLF flag when the mark was set. */
-  private boolean markedSkipLF = false;
-
-  private static final int DEFAULT_CHAR_BUFFER_SIZE = 8192;
-  private static final int DEFAULT_EXPECTED_LINE_LENGTH = 80;
-
-  /** the total bytes number already filled into cb. */
-  private long totalFilledBytesNum = 0;
-
-  /**
-   * Creates a buffering character-input stream that uses an input buffer of 
the specified size.
-   *
-   * @param in A Reader
-   * @param sz Input-buffer size
-   * @exception IllegalArgumentException If {@code sz <= 0}
-   */
-  public TracedBufferedReader(Reader in, int sz) {
-    super(in);
-    if (sz <= 0) {
-      throw new IllegalArgumentException("Buffer size <= 0");
-    }
-    this.in = in;
-    cb = new char[sz];
-    nextChar = nChars = 0;
-  }
-
-  /**
-   * Creates a buffering character-input stream that uses a default-sized 
input buffer.
-   *
-   * @param in A Reader
-   */
-  public TracedBufferedReader(Reader in) {
-    this(in, DEFAULT_CHAR_BUFFER_SIZE);
-  }
-
-  /** Checks to make sure that the stream has not been closed. */
-  private void ensureOpen() throws IOException {
-    if (in == null) {
-      throw new IOException("Stream closed");
-    }
-  }
-
-  /** {@link BufferedReader#fill()} */
-  private void fill() throws IOException {
-    int dst;
-    if (markedChar <= UNMARKED) {
-      /* No mark */
-      dst = 0;
-    } else {
-      /* Marked */
-      int delta = nextChar - markedChar;
-      if (delta >= readAheadLimit) {
-        /* Gone past read-ahead limit: Invalidate mark. */
-        markedChar = INVALIDATED;
-        readAheadLimit = 0;
-        dst = 0;
-      } else {
-        if (readAheadLimit <= cb.length) {
-          /* Shuffle in the current buffer. */
-          System.arraycopy(cb, markedChar, cb, 0, delta);
-          markedChar = 0;
-          dst = delta;
-        } else {
-          /* Reallocate buffer to accommodate read-ahead limit. */
-          char[] ncb = new char[readAheadLimit];
-          System.arraycopy(cb, markedChar, ncb, 0, delta);
-          cb = ncb;
-          markedChar = 0;
-          dst = delta;
-        }
-        nextChar = nChars = delta;
-      }
-    }
-
-    int n;
-    do {
-      n = in.read(cb, dst, cb.length - dst);
-    } while (n == 0);
-    if (n > 0) {
-      nChars = dst + n;
-      nextChar = dst;
-      totalFilledBytesNum = totalFilledBytesNum + n;
-    }
-  }
-
-  /** {@link BufferedReader#read()} */
-  @Override
-  public int read() throws IOException {
-    synchronized (lock) {
-      ensureOpen();
-      for (; ; ) {
-        if (nextChar >= nChars) {
-          fill();
-          if (nextChar >= nChars) {
-            return -1;
-          }
-        }
-        if (skipLF) {
-          skipLF = false;
-          if (cb[nextChar] == '\n') {
-            nextChar++;
-            continue;
-          }
-        }
-        return cb[nextChar++];
-      }
-    }
-  }
-
-  /** {@link BufferedReader#read1(char[], int, int)} */
-  private int read1(char[] cbuf, int off, int len) throws IOException {
-    if (nextChar >= nChars) {
-      /* If the requested length is at least as large as the buffer, and
-      if there is no mark/reset activity, and if line feeds are not
-      being skipped, do not bother to copy the characters into the
-      local buffer.  In this way buffered streams will cascade
-      harmlessly. */
-      if (len >= cb.length && markedChar <= UNMARKED && !skipLF) {
-        return in.read(cbuf, off, len);
-      }
-      fill();
-    }
-    if (nextChar >= nChars) {
-      return -1;
-    }
-    if (skipLF) {
-      skipLF = false;
-      if (cb[nextChar] == '\n') {
-        nextChar++;
-        if (nextChar >= nChars) {
-          fill();
-        }
-        if (nextChar >= nChars) {
-          return -1;
-        }
-      }
-    }
-    int n = Math.min(len, nChars - nextChar);
-    System.arraycopy(cb, nextChar, cbuf, off, n);
-    nextChar += n;
-    return n;
-  }
-
-  /** {@link BufferedReader#read(char[], int, int)} */
-  @Override
-  public int read(char[] cbuf, int off, int len) throws IOException {
-    synchronized (lock) {
-      ensureOpen();
-      if ((off < 0)
-          || (off > cbuf.length)
-          || (len < 0)
-          || ((off + len) > cbuf.length)
-          || ((off + len) < 0)) {
-        throw new IndexOutOfBoundsException();
-      } else if (len == 0) {
-        return 0;
-      }
-
-      int n = read1(cbuf, off, len);
-      if (n <= 0) {
-        return n;
-      }
-      while ((n < len) && in.ready()) {
-        int n1 = read1(cbuf, off + n, len - n);
-        if (n1 <= 0) {
-          break;
-        }
-        n += n1;
-      }
-      return n;
-    }
-  }
-
-  /** {@link BufferedReader#readLine(boolean)} */
-  String readLine(boolean ignoreLF) throws IOException {
-    StringBuilder s = null;
-    int startChar;
-
-    synchronized (lock) {
-      ensureOpen();
-      boolean omitLF = ignoreLF || skipLF;
-
-      bufferLoop:
-      for (; ; ) {
-
-        if (nextChar >= nChars) {
-          fill();
-        }
-        if (nextChar >= nChars) {
-          /* EOF */
-          if (s != null && s.length() > 0) {
-            return s.toString();
-          } else {
-            return null;
-          }
-        }
-        boolean eol = false;
-        char c = 0;
-        int i;
-
-        /* Skip a leftover '\n', if necessary. */
-        if (omitLF && (cb[nextChar] == '\n')) {
-          nextChar++;
-        }
-        skipLF = false;
-        omitLF = false;
-
-        charLoop:
-        for (i = nextChar; i < nChars; i++) {
-          c = cb[i];
-          if ((c == '\n') || (c == '\r')) {
-            eol = true;
-            break charLoop;
-          }
-        }
-
-        startChar = nextChar;
-        nextChar = i;
-
-        if (eol) {
-          String str;
-          if (s == null) {
-            str = new String(cb, startChar, i - startChar);
-          } else {
-            s.append(cb, startChar, i - startChar);
-            str = s.toString();
-          }
-          nextChar++;
-          if (c == '\r') {
-            skipLF = true;
-            if (read() != -1) {
-              nextChar--;
-            }
-          }
-          return str;
-        }
-
-        if (s == null) {
-          s = new StringBuilder(DEFAULT_EXPECTED_LINE_LENGTH);
-        }
-        s.append(cb, startChar, i - startChar);
-      }
-    }
-  }
-
-  /** {@link BufferedReader#readLine()} */
-  public String readLine() throws IOException {
-    return readLine(false);
-  }
-
-  /** {@link BufferedReader#skip(long)} */
-  @Override
-  public long skip(long n) throws IOException {
-    if (n < 0L) {
-      throw new IllegalArgumentException("skip value is negative");
-    }
-    synchronized (lock) {
-      ensureOpen();
-      long r = n;
-      while (r > 0) {
-        if (nextChar >= nChars) {
-          fill();
-        }
-        if (nextChar >= nChars) {
-          /* EOF */
-          break;
-        }
-        if (skipLF) {
-          skipLF = false;
-          if (cb[nextChar] == '\n') {
-            nextChar++;
-          }
-        }
-        long d = (long) nChars - nextChar;
-        if (r <= d) {
-          nextChar += r;
-          r = 0;
-          break;
-        } else {
-          r -= d;
-          nextChar = nChars;
-        }
-      }
-      return n - r;
-    }
-  }
-
-  /** {@link BufferedReader#ready()} */
-  @Override
-  public boolean ready() throws IOException {
-    synchronized (lock) {
-      ensureOpen();
-
-      /*
-       * If newline needs to be skipped and the next char to be read
-       * is a newline character, then just skip it right away.
-       */
-      if (skipLF) {
-        /* Note that in.ready() will return true if and only if the next
-         * read on the stream will not block.
-         */
-        if (nextChar >= nChars && in.ready()) {
-          fill();
-        }
-        if (nextChar < nChars) {
-          if (cb[nextChar] == '\n') {
-            nextChar++;
-          }
-          skipLF = false;
-        }
-      }
-      return (nextChar < nChars) || in.ready();
-    }
-  }
-
-  /** {@link BufferedReader#markSupported()} */
-  @Override
-  public boolean markSupported() {
-    return true;
-  }
-
-  /** {@link BufferedReader#mark(int)} */
-  @Override
-  public void mark(int readAheadLimit) throws IOException {
-    if (readAheadLimit < 0) {
-      throw new IllegalArgumentException("Read-ahead limit < 0");
-    }
-    synchronized (lock) {
-      ensureOpen();
-      this.readAheadLimit = readAheadLimit;
-      markedChar = nextChar;
-      markedSkipLF = skipLF;
-    }
-  }
-
-  /** {@link BufferedReader#reset()} */
-  @Override
-  public void reset() throws IOException {
-    synchronized (lock) {
-      ensureOpen();
-      if (markedChar < 0) {
-        throw new IOException((markedChar == INVALIDATED) ? "Mark invalid" : 
"Stream not marked");
-      }
-      nextChar = markedChar;
-      skipLF = markedSkipLF;
-    }
-  }
-
-  /** {@link BufferedReader#close()} */
-  @Override
-  public void close() throws IOException {
-    synchronized (lock) {
-      if (in == null) {
-        return;
-      }
-      try {
-        in.close();
-      } finally {
-        in = null;
-        cb = null;
-      }
-    }
-  }
-
-  /** {@link BufferedReader#lines()} */
-  public Stream<String> lines() {
-    Iterator<String> iter =
-        new Iterator<String>() {
-          String nextLine = null;
-
-          @Override
-          public boolean hasNext() {
-            if (nextLine != null) {
-              return true;
-            } else {
-              try {
-                nextLine = readLine();
-                return (nextLine != null);
-              } catch (IOException e) {
-                throw new UncheckedIOException(e);
-              }
-            }
-          }
-
-          @Override
-          public String next() {
-            if (nextLine != null || hasNext()) {
-              String line = nextLine;
-              nextLine = null;
-              return line;
-            } else {
-              throw new NoSuchElementException();
-            }
-          }
-        };
-    return StreamSupport.stream(
-        Spliterators.spliteratorUnknownSize(iter, Spliterator.ORDERED | 
Spliterator.NONNULL),
-        false);
-  }
-
-  /**
-   * Returns this reader's file position.
-   *
-   * @return This reader's file position, a non-negative integer counting the 
number of bytes from
-   *     the beginning of the file to the current position
-   */
-  public long position() {
-    // position = totalFilledBytesNum - lastFilledBytesNum + 
readOffsetInLastFilledBytes
-    // lastFilledBytesNum = nChars - dst, readOffsetInLastFilledBytes = 
nextChar - dst
-    return totalFilledBytesNum - nChars + nextChar;
-  }
-}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFileTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFileTest.java
index a6bbe3e226e..ad6625b2bc3 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFileTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFileTest.java
@@ -21,6 +21,7 @@ package 
org.apache.iotdb.db.storageengine.dataregion.modification;
 
 import org.apache.iotdb.commons.path.PartialPath;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.recover.CompactionRecoverManager;
+import 
org.apache.iotdb.db.storageengine.dataregion.modification.io.LocalTextModificationAccessor;
 import org.apache.iotdb.db.utils.constant.TestConstant;
 
 import org.junit.Assert;
@@ -72,33 +73,25 @@ public class ModificationFileTest {
   }
 
   @Test
-  public void testAbort() {
+  public void writeVerifyTest() {
     String tempFileName = TestConstant.BASE_OUTPUT_PATH.concat("mod.temp");
     Modification[] modifications =
         new Modification[] {
           new Deletion(new PartialPath(new String[] {"d1", "s1"}), 1, 1),
-          new Deletion(new PartialPath(new String[] {"d1", "s2"}), 2, 2),
-          new Deletion(new PartialPath(new String[] {"d1", "s3"}), 3, 3, 4),
-          new Deletion(new PartialPath(new String[] {"d1", "s4"}), 4, 4, 5),
+          new Deletion(new PartialPath(new String[] {"d1", "s2"}), 2, 2)
         };
     try (ModificationFile mFile = new ModificationFile(tempFileName)) {
-      for (int i = 0; i < 2; i++) {
-        mFile.write(modifications[i]);
+      mFile.write(modifications[0]);
+      try (LocalTextModificationAccessor accessor =
+          new LocalTextModificationAccessor(tempFileName)) {
+        accessor.writeInComplete(modifications[1]);
       }
+      mFile.write(modifications[1]);
       List<Modification> modificationList = (List<Modification>) 
mFile.getModifications();
+      assertEquals(2, modificationList.size());
       for (int i = 0; i < 2; i++) {
         assertEquals(modifications[i], modificationList.get(i));
       }
-
-      for (int i = 2; i < 4; i++) {
-        mFile.write(modifications[i]);
-      }
-      modificationList = (List<Modification>) mFile.getModifications();
-      mFile.abort();
-
-      for (int i = 0; i < 3; i++) {
-        assertEquals(modifications[i], modificationList.get(i));
-      }
     } catch (IOException e) {
       fail(e.getMessage());
     } finally {
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/modification/io/LocalTextModificationAccessorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/modification/io/LocalTextModificationAccessorTest.java
index 136531eb238..e40f3c7bcc1 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/modification/io/LocalTextModificationAccessorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/modification/io/LocalTextModificationAccessorTest.java
@@ -24,11 +24,10 @@ import 
org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
 import org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
 import org.apache.iotdb.db.utils.constant.TestConstant;
 
+import org.junit.AfterClass;
 import org.junit.Test;
 
-import java.io.BufferedWriter;
 import java.io.File;
-import java.io.FileWriter;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -38,17 +37,53 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
 public class LocalTextModificationAccessorTest {
+  private static Modification[] modifications =
+      new Modification[] {
+        new Deletion(new PartialPath(new String[] {"d1", "s1"}), 1, 1),
+        new Deletion(new PartialPath(new String[] {"d1", "s2"}), 2, 2),
+        new Deletion(new PartialPath(new String[] {"d1", "s3"}), 3, 3),
+        new Deletion(new PartialPath(new String[] {"d1", "s4"}), 4, 4),
+      };
+
+  @AfterClass
+  public static void tearDown() {
+    modifications = null;
+  }
+
+  @Test
+  public void writeMeetException() throws IOException {
+    String tempFileName = TestConstant.BASE_OUTPUT_PATH.concat("mod.temp");
+    long length = 0;
+    LocalTextModificationAccessor accessor = null;
+    try {
+      accessor = new LocalTextModificationAccessor(tempFileName);
+      for (int i = 0; i < 2; i++) {
+        accessor.write(modifications[i]);
+      }
+      length = new File(tempFileName).length();
+      // the current line should be truncated when meet exception
+      accessor.writeMeetException(modifications[2]);
+      for (int i = 2; i < 4; i++) {
+        accessor.write(modifications[i]);
+      }
+      List<Modification> modificationList = (List<Modification>) 
accessor.read();
+      assertEquals(4, modificationList.size());
+      for (int i = 0; i < 4; i++) {
+        assertEquals(modifications[i], modificationList.get(i));
+      }
+    } catch (IOException e) {
+      accessor.truncate(length);
+    } finally {
+      if (accessor != null) {
+        accessor.close();
+      }
+      new File(tempFileName).delete();
+    }
+  }
 
   @Test
   public void readMyWrite() {
     String tempFileName = TestConstant.BASE_OUTPUT_PATH.concat("mod.temp");
-    Modification[] modifications =
-        new Modification[] {
-          new Deletion(new PartialPath(new String[] {"d1", "s1"}), 1, 1),
-          new Deletion(new PartialPath(new String[] {"d1", "s2"}), 2, 2),
-          new Deletion(new PartialPath(new String[] {"d1", "s3"}), 3, 3),
-          new Deletion(new PartialPath(new String[] {"d1", "s4"}), 4, 4),
-        };
     try (LocalTextModificationAccessor accessor = new 
LocalTextModificationAccessor(tempFileName)) {
       for (int i = 0; i < 2; i++) {
         accessor.write(modifications[i]);
@@ -75,52 +110,41 @@ public class LocalTextModificationAccessorTest {
   @Test
   public void readNull() {
     String tempFileName = TestConstant.BASE_OUTPUT_PATH.concat("mod.temp");
-    LocalTextModificationAccessor accessor = new 
LocalTextModificationAccessor(tempFileName);
-    new File(tempFileName).delete();
-    Collection<Modification> modifications = accessor.read();
-    assertEquals(new ArrayList<>(), modifications);
+    try (LocalTextModificationAccessor accessor = new 
LocalTextModificationAccessor(tempFileName)) {
+      new File(tempFileName).delete();
+      Collection<Modification> modifications = accessor.read();
+      assertEquals(new ArrayList<>(), modifications);
+    } catch (Exception e) {
+      fail(e.getMessage());
+    }
   }
 
   @Test
-  public void readAndTruncate() {
+  public void readMeetError() {
     String tempFileName = TestConstant.BASE_OUTPUT_PATH.concat("mod.temp");
     File file = new File(tempFileName);
     if (file.exists()) {
       file.delete();
     }
-    Modification[] modifications =
-        new Modification[] {
-          new Deletion(new PartialPath(new String[] {"d1", "s1"}), 1, 1),
-          new Deletion(new PartialPath(new String[] {"d1", "s2"}), 2, 2),
-          new Deletion(new PartialPath(new String[] {"d1", "s3"}), 3, 3),
-          new Deletion(new PartialPath(new String[] {"d1", "s4"}), 4, 4),
-        };
-    try (LocalTextModificationAccessor accessor = new 
LocalTextModificationAccessor(tempFileName);
-        BufferedWriter writer = new BufferedWriter(new 
FileWriter(tempFileName, true))) {
+    try (LocalTextModificationAccessor accessor = new 
LocalTextModificationAccessor(tempFileName)) {
       // write normal message
-      for (int i = 0; i < 2; i++) {
+      for (int i = 0; i < 4; i++) {
         accessor.write(modifications[i]);
       }
       List<Modification> modificationList = (List<Modification>) 
accessor.read();
-      for (int i = 0; i < 2; i++) {
+      for (int i = 0; i < 4; i++) {
         assertEquals(modifications[i], modificationList.get(i));
       }
       // write error message
-      long length = file.length();
-      writer.write("error");
-      writer.newLine();
-      writer.flush();
-      // write normal message & read
-      for (int i = 2; i < 4; i++) {
-        accessor.write(modifications[i]);
-      }
+      accessor.writeInComplete(modifications[0]);
+
       modificationList = (List<Modification>) accessor.read();
-      for (int i = 0; i < 2; i++) {
+      // the error line is ignored
+      assertEquals(4, modificationList.size());
+      for (int i = 0; i < 4; i++) {
         System.out.println(modificationList);
         assertEquals(modifications[i], modificationList.get(i));
       }
-      // check truncated file
-      assertEquals(length, file.length());
     } catch (IOException e) {
       fail(e.getMessage());
     } finally {


Reply via email to