This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch tiered_storage
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 43b3c40aeee0eaa935a0a37b85fe64ae6821dad7
Author: Jinrui.Zhang <[email protected]>
AuthorDate: Wed May 17 17:08:55 2023 +0800

    tmp save 0517
---
 .../apache/iotdb/os/cache/OSFileCacheValue.java    |  22 ++--
 .../org/apache/iotdb/os/cache/OSFileChannel.java   | 126 +++++++++++++--------
 2 files changed, 91 insertions(+), 57 deletions(-)

diff --git 
a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheValue.java 
b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheValue.java
index 1eed2f6e72..ab9e471890 100644
--- 
a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheValue.java
+++ 
b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheValue.java
@@ -33,9 +33,9 @@ public class OSFileCacheValue {
   /** cache key size */
   private int metaSize;
   /** start position in the remote TsFile */
-  private long startPositionInTsFile;
+  private long startPositionInOSFile;
   /** start position in the remote TsFile */
-  private long endPositionInTsFile;
+  private long endPositionInOSFile;
 
   private boolean shouldDelete;
   private int readCnt;
@@ -45,13 +45,13 @@ public class OSFileCacheValue {
       long startPositionInCacheFile,
       int metaSize,
       int dataSize,
-      long startPositionInTsFile) {
+      long startPositionInOSFile) {
     this.cacheFile = cacheFile;
     this.startPositionInCacheFile = startPositionInCacheFile;
     this.metaSize = metaSize;
     this.dataSize = dataSize;
-    this.startPositionInTsFile = startPositionInTsFile;
-    this.endPositionInTsFile = startPositionInTsFile + dataSize;
+    this.startPositionInOSFile = startPositionInOSFile;
+    this.endPositionInOSFile = startPositionInOSFile + dataSize;
   }
 
   public File getCacheFile() {
@@ -76,12 +76,12 @@ public class OSFileCacheValue {
     return metaSize + dataSize;
   }
 
-  public long getStartPositionInTsFile() {
-    return startPositionInTsFile;
+  public long getStartPositionInOSFile() {
+    return startPositionInOSFile;
   }
 
-  public long getEndPositionInTsFile() {
-    return endPositionInTsFile;
+  public long getEndPositionInOSFile() {
+    return endPositionInOSFile;
   }
 
   public long getEndPositionInCacheFile() {
@@ -93,10 +93,10 @@ public class OSFileCacheValue {
    * the position is outside the cache file range
    */
   public long convertTsFilePos2CachePos(long positionInTsFile) {
-    if (positionInTsFile < startPositionInTsFile || positionInTsFile >= 
endPositionInTsFile) {
+    if (positionInTsFile < startPositionInOSFile || positionInTsFile >= 
endPositionInOSFile) {
       return -1;
     }
-    return startPositionInCacheFile + metaSize + (positionInTsFile - 
startPositionInTsFile);
+    return startPositionInCacheFile + metaSize + (positionInTsFile - 
startPositionInOSFile);
   }
 
   /** Mark this value should be deleted, delete this value when no one is 
reading it. */
diff --git 
a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileChannel.java 
b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileChannel.java
index b5cee34dcc..b01e9c6a01 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileChannel.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileChannel.java
@@ -40,35 +40,26 @@ public class OSFileChannel implements Closeable {
   private static final OSFileCache cache = OSFileCache.getInstance();
   private final OSFile osFile;
   private long position = 0;
-  private OSFileCacheValue cacheFile;
-  private FileChannel cacheFileChannel;
+
+  private OSFileBlock currentOSFileBlock;
 
   public OSFileChannel(OSFile osFile) throws IOException {
     this.osFile = osFile;
-    openNextCacheFile();
   }
 
   public static InputStream newInputStream(OSFileChannel channel) {
     return new OSInputStream(channel);
   }
 
-  private boolean isPositionValid() {
-    return cacheFile.getStartPositionInTsFile() <= position
-        && position < cacheFile.getEndPositionInTsFile();
-  }
-
-  private void openNextCacheFile() throws IOException {
+  private void openNextCacheFile(int position) throws IOException {
     // close prev cache file
-    close();
+    closeCurrentOSFileBlock();
     // open next cache file
-    OSFileCacheKey key = locateCacheFileFromPosition();
-    while (!cacheFile.tryReadLock()) {
-      cacheFile = cache.get(key);
-    }
-    cacheFileChannel = FileChannel.open(cacheFile.getCacheFile().toPath(), 
StandardOpenOption.READ);
+    OSFileCacheKey key = locateCacheFileFromPosition(position);
+    currentOSFileBlock = new OSFileBlock(key);
   }
 
-  private OSFileCacheKey locateCacheFileFromPosition() throws IOException {
+  private OSFileCacheKey locateCacheFileFromPosition(int position) throws 
IOException {
     if (position >= size()) {
       throw new IOException("EOF");
     }
@@ -84,19 +75,26 @@ public class OSFileChannel implements Closeable {
     return position;
   }
 
-  public void position(long newPosition) {
+  public synchronized void position(long newPosition) {
     if (newPosition < 0) {
       throw new IllegalArgumentException();
     }
     position = newPosition;
   }
 
-  public int read(ByteBuffer dst) throws IOException {
-    return read(dst, position);
+  public synchronized int read(ByteBuffer dst) throws IOException {
+    int readSize = read(dst, position);
+    position(position + readSize);
+    return readSize;
   }
 
-  public int read(ByteBuffer dst, long position) throws IOException {
+  public synchronized int read(ByteBuffer dst, long position) throws 
IOException {
+    int currentPosition = (int) position;
     dst.mark();
+    int dstLimit = dst.limit();
+    // read each cache file
+    int totalReadBytes = 0;
+
     // determiner the ead range
     long startPos = position;
     long endPos = position + dst.remaining();
@@ -106,41 +104,77 @@ public class OSFileChannel implements Closeable {
     if (endPos > size()) {
       endPos = size();
     }
-    // read each cache file
-    int totalReadBytes = 0;
-    while (startPos < endPos) {
-      if (!isPositionValid()) {
-        openNextCacheFile();
-      }
-      long readStartPosition = cacheFile.convertTsFilePos2CachePos(startPos);
-      long readEndPosition = cacheFile.convertTsFilePos2CachePos(endPos);
-      if (readEndPosition < 0) {
-        readEndPosition = cacheFile.getEndPositionInCacheFile();
-      }
-      int readSize = (int) (readEndPosition - readStartPosition);
-      cacheFileChannel.position(readStartPosition);
-      long read = cacheFileChannel.read(new ByteBuffer[] {dst}, 0, readSize);
-      if (read != readSize) {
-        dst.reset();
-        throw new IOException(
-            String.format(
-                "Cache file %s may crash because cannot read enough 
information in the cash file.",
-                osFile));
+    try {
+      while (startPos < endPos) {
+        if (currentOSFileBlock == null || 
!currentOSFileBlock.canRead(startPos)) {
+          openNextCacheFile(currentPosition);
+        }
+        int readSize = currentOSFileBlock.read(dst, startPos, endPos);
+        totalReadBytes += readSize;
+        startPos += readSize;
+        currentPosition += readSize;
       }
-      totalReadBytes += read;
-      startPos += read;
+    } catch (IOException e) {
+      dst.reset();
+      throw e;
+    } finally {
+      dst.limit(dstLimit);
     }
-    this.position = position + totalReadBytes;
     return totalReadBytes;
   }
 
+  private void closeCurrentOSFileBlock() throws IOException {
+    if (currentOSFileBlock != null) {
+      currentOSFileBlock.close();
+    }
+  }
+
   @Override
   public void close() throws IOException {
-    if (cacheFile != null) {
+    closeCurrentOSFileBlock();
+  }
+
+  private static class OSFileBlock {
+    private OSFileCacheValue cacheValue;
+    private FileChannel fileChannel;
+
+    public OSFileBlock(OSFileCacheKey cacheKey) throws IOException {
+      do {
+        cacheValue = cache.get(cacheKey);
+      } while (!cacheValue.tryReadLock());
+      fileChannel = FileChannel.open(cacheValue.getCacheFile().toPath(), 
StandardOpenOption.READ);
+    }
+
+    public boolean canRead(long positionInOSFile) {
+      return cacheValue.getStartPositionInOSFile() <= positionInOSFile
+          && positionInOSFile < cacheValue.getEndPositionInOSFile();
+    }
+
+    public int read(ByteBuffer dst, long startPos, long endPos) throws 
IOException {
+      long readStartPosition = cacheValue.convertTsFilePos2CachePos(startPos);
+      long expectedReadLength = endPos - startPos;
+
+      int readSize =
+          (int)
+              Math.min(
+                  expectedReadLength, cacheValue.getEndPositionInCacheFile() - 
readStartPosition);
+
+      dst.limit(dst.position() + readSize);
+      long actualReadSize = fileChannel.read(dst, readStartPosition);
+      if (actualReadSize != readSize) {
+        throw new IOException(
+            String.format(
+                "Cache file %s may crash because cannot read enough 
information in the cash file.",
+                cacheValue.getCacheFile()));
+      }
+      return readSize;
+    }
+
+    public void close() throws IOException {
       try {
-        cacheFileChannel.close();
+        fileChannel.close();
       } finally {
-        cacheFile.readUnlock();
+        cacheValue.readUnlock();
       }
     }
   }

Reply via email to