This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch tiered_storage in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 43b3c40aeee0eaa935a0a37b85fe64ae6821dad7 Author: Jinrui.Zhang <[email protected]> AuthorDate: Wed May 17 17:08:55 2023 +0800 tmp save 0517 --- .../apache/iotdb/os/cache/OSFileCacheValue.java | 22 ++-- .../org/apache/iotdb/os/cache/OSFileChannel.java | 126 +++++++++++++-------- 2 files changed, 91 insertions(+), 57 deletions(-) diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheValue.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheValue.java index 1eed2f6e72..ab9e471890 100644 --- a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheValue.java +++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheValue.java @@ -33,9 +33,9 @@ public class OSFileCacheValue { /** cache key size */ private int metaSize; /** start position in the remote TsFile */ - private long startPositionInTsFile; + private long startPositionInOSFile; /** start position in the remote TsFile */ - private long endPositionInTsFile; + private long endPositionInOSFile; private boolean shouldDelete; private int readCnt; @@ -45,13 +45,13 @@ public class OSFileCacheValue { long startPositionInCacheFile, int metaSize, int dataSize, - long startPositionInTsFile) { + long startPositionInOSFile) { this.cacheFile = cacheFile; this.startPositionInCacheFile = startPositionInCacheFile; this.metaSize = metaSize; this.dataSize = dataSize; - this.startPositionInTsFile = startPositionInTsFile; - this.endPositionInTsFile = startPositionInTsFile + dataSize; + this.startPositionInOSFile = startPositionInOSFile; + this.endPositionInOSFile = startPositionInOSFile + dataSize; } public File getCacheFile() { @@ -76,12 +76,12 @@ public class OSFileCacheValue { return metaSize + dataSize; } - public long getStartPositionInTsFile() { - return startPositionInTsFile; + public long getStartPositionInOSFile() { + return startPositionInOSFile; } - public long getEndPositionInTsFile() { - return endPositionInTsFile; + public long getEndPositionInOSFile() { + return endPositionInOSFile; } public long getEndPositionInCacheFile() { @@ -93,10 +93,10 @@ public class OSFileCacheValue { * the position is outside the cache file range */ public long convertTsFilePos2CachePos(long positionInTsFile) { - if (positionInTsFile < startPositionInTsFile || positionInTsFile >= endPositionInTsFile) { + if (positionInTsFile < startPositionInOSFile || positionInTsFile >= endPositionInOSFile) { return -1; } - return startPositionInCacheFile + metaSize + (positionInTsFile - startPositionInTsFile); + return startPositionInCacheFile + metaSize + (positionInTsFile - startPositionInOSFile); } /** Mark this value should be deleted, delete this value when no one is reading it. */ diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileChannel.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileChannel.java index b5cee34dcc..b01e9c6a01 100644 --- a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileChannel.java +++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileChannel.java @@ -40,35 +40,26 @@ public class OSFileChannel implements Closeable { private static final OSFileCache cache = OSFileCache.getInstance(); private final OSFile osFile; private long position = 0; - private OSFileCacheValue cacheFile; - private FileChannel cacheFileChannel; + + private OSFileBlock currentOSFileBlock; public OSFileChannel(OSFile osFile) throws IOException { this.osFile = osFile; - openNextCacheFile(); } public static InputStream newInputStream(OSFileChannel channel) { return new OSInputStream(channel); } - private boolean isPositionValid() { - return cacheFile.getStartPositionInTsFile() <= position - && position < cacheFile.getEndPositionInTsFile(); - } - - private void openNextCacheFile() throws IOException { + private void openNextCacheFile(int position) throws IOException { // close prev cache file - close(); + closeCurrentOSFileBlock(); // open next cache file - OSFileCacheKey key = locateCacheFileFromPosition(); - while (!cacheFile.tryReadLock()) { - cacheFile = cache.get(key); - } - cacheFileChannel = FileChannel.open(cacheFile.getCacheFile().toPath(), StandardOpenOption.READ); + OSFileCacheKey key = locateCacheFileFromPosition(position); + currentOSFileBlock = new OSFileBlock(key); } - private OSFileCacheKey locateCacheFileFromPosition() throws IOException { + private OSFileCacheKey locateCacheFileFromPosition(int position) throws IOException { if (position >= size()) { throw new IOException("EOF"); } @@ -84,19 +75,26 @@ public class OSFileChannel implements Closeable { return position; } - public void position(long newPosition) { + public synchronized void position(long newPosition) { if (newPosition < 0) { throw new IllegalArgumentException(); } position = newPosition; } - public int read(ByteBuffer dst) throws IOException { - return read(dst, position); + public synchronized int read(ByteBuffer dst) throws IOException { + int readSize = read(dst, position); + position(position + readSize); + return readSize; } - public int read(ByteBuffer dst, long position) throws IOException { + public synchronized int read(ByteBuffer dst, long position) throws IOException { + int currentPosition = (int) position; dst.mark(); + int dstLimit = dst.limit(); + // read each cache file + int totalReadBytes = 0; + // determiner the ead range long startPos = position; long endPos = position + dst.remaining(); @@ -106,41 +104,77 @@ public class OSFileChannel implements Closeable { if (endPos > size()) { endPos = size(); } - // read each cache file - int totalReadBytes = 0; - while (startPos < endPos) { - if (!isPositionValid()) { - openNextCacheFile(); - } - long readStartPosition = cacheFile.convertTsFilePos2CachePos(startPos); - long readEndPosition = cacheFile.convertTsFilePos2CachePos(endPos); - if (readEndPosition < 0) { - readEndPosition = cacheFile.getEndPositionInCacheFile(); - } - int readSize = (int) (readEndPosition - readStartPosition); - cacheFileChannel.position(readStartPosition); - long read = cacheFileChannel.read(new ByteBuffer[] {dst}, 0, readSize); - if (read != readSize) { - dst.reset(); - throw new IOException( - String.format( - "Cache file %s may crash because cannot read enough information in the cash file.", - osFile)); + try { + while (startPos < endPos) { + if (currentOSFileBlock == null || !currentOSFileBlock.canRead(startPos)) { + openNextCacheFile(currentPosition); + } + int readSize = currentOSFileBlock.read(dst, startPos, endPos); + totalReadBytes += readSize; + startPos += readSize; + currentPosition += readSize; } - totalReadBytes += read; - startPos += read; + } catch (IOException e) { + dst.reset(); + throw e; + } finally { + dst.limit(dstLimit); } - this.position = position + totalReadBytes; return totalReadBytes; } + private void closeCurrentOSFileBlock() throws IOException { + if (currentOSFileBlock != null) { + currentOSFileBlock.close(); + } + } + @Override public void close() throws IOException { - if (cacheFile != null) { + closeCurrentOSFileBlock(); + } + + private static class OSFileBlock { + private OSFileCacheValue cacheValue; + private FileChannel fileChannel; + + public OSFileBlock(OSFileCacheKey cacheKey) throws IOException { + do { + cacheValue = cache.get(cacheKey); + } while (!cacheValue.tryReadLock()); + fileChannel = FileChannel.open(cacheValue.getCacheFile().toPath(), StandardOpenOption.READ); + } + + public boolean canRead(long positionInOSFile) { + return cacheValue.getStartPositionInOSFile() <= positionInOSFile + && positionInOSFile < cacheValue.getEndPositionInOSFile(); + } + + public int read(ByteBuffer dst, long startPos, long endPos) throws IOException { + long readStartPosition = cacheValue.convertTsFilePos2CachePos(startPos); + long expectedReadLength = endPos - startPos; + + int readSize = + (int) + Math.min( + expectedReadLength, cacheValue.getEndPositionInCacheFile() - readStartPosition); + + dst.limit(dst.position() + readSize); + long actualReadSize = fileChannel.read(dst, readStartPosition); + if (actualReadSize != readSize) { + throw new IOException( + String.format( + "Cache file %s may crash because cannot read enough information in the cash file.", + cacheValue.getCacheFile())); + } + return readSize; + } + + public void close() throws IOException { try { - cacheFileChannel.close(); + fileChannel.close(); } finally { - cacheFile.readUnlock(); + cacheValue.readUnlock(); } } }
