This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch tiered_storage_0517 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 9a08773bf5d080a90fffe20e5c575f21963dded1 Author: Jinrui.Zhang <[email protected]> AuthorDate: Wed May 17 15:16:40 2023 +0800 change some logic in OSFileChannel --- .../apache/iotdb/os/cache/CacheFileManager.java | 4 +- .../apache/iotdb/os/cache/CacheInputStream.java | 4 +- .../apache/iotdb/os/cache/CacheRecoverTask.java | 3 +- .../apache/iotdb/os/cache/OSFileCacheValue.java | 50 +++++++++++++++---- .../{CacheFileChannel.java => OSFileChannel.java} | 56 ++++++++++------------ .../apache/iotdb/os/fileSystem/OSTsFileInput.java | 8 ++-- 6 files changed, 77 insertions(+), 48 deletions(-) diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileManager.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileManager.java index cb88c0771c..5c74076fce 100644 --- a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileManager.java +++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileManager.java @@ -75,7 +75,9 @@ public class CacheFileManager { ByteBuffer meta = key.serialize(); channel.write(meta); channel.write(ByteBuffer.wrap(data)); - res = new OSFileCacheValue(tmpCacheFile, 0, meta.capacity(), data.length); + res = + new OSFileCacheValue( + tmpCacheFile, 0, key.getStartPosition(), meta.capacity(), data.length); } catch (IOException e) { logger.error("Fail to persist data to cache file {}", tmpCacheFile, e); tmpCacheFile.delete(); diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheInputStream.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheInputStream.java index 7fcdec4bf8..2701a6d6cd 100644 --- a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheInputStream.java +++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheInputStream.java @@ -23,9 +23,9 @@ import java.io.InputStream; import java.nio.ByteBuffer; public class CacheInputStream extends InputStream { - private final CacheFileChannel channel; + private final OSFileChannel channel; - public CacheInputStream(CacheFileChannel channel) { + public CacheInputStream(OSFileChannel channel) { super(); this.channel = channel; } diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheRecoverTask.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheRecoverTask.java index bfebeaadf1..c64a002bd5 100644 --- a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheRecoverTask.java +++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheRecoverTask.java @@ -70,7 +70,8 @@ public class CacheRecoverTask implements Runnable { cacheFile.delete(); continue; } - OSFileCacheValue value = new OSFileCacheValue(cacheFile, 0, metaSize, dataSize); + OSFileCacheValue value = + new OSFileCacheValue(cacheFile, 0, key.getStartPosition(), metaSize, dataSize); cache.put(key, value); } // update max cache file id diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheValue.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheValue.java index 85295506c5..65696cd973 100644 --- a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheValue.java +++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheValue.java @@ -19,15 +19,25 @@ package org.apache.iotdb.os.cache; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.StandardOpenOption; public class OSFileCacheValue { + private static final Logger logger = LoggerFactory.getLogger(OSFileCacheValue.class); /** local cache file */ private File cacheFile; // 如果每个块用一个文件来存储,则该值一直为 0 // 如果使用一个大文件存储所有块,则该值为大文件中的起点 /** start position in the local cache file */ - private long startPosition; + private long startPositionInCacheFile; + + private long startPositionOfOSFile; /** cache data size */ private int dataSize; /** cache key size */ @@ -36,9 +46,13 @@ public class OSFileCacheValue { private boolean shouldDelete; private int readCnt; - public OSFileCacheValue(File cacheFile, long startPosition, int metaSize, int dataSize) { + private FileChannel cacheFileChannel; + + public OSFileCacheValue( + File cacheFile, long startPosition, long startPositionOfOSFile, int metaSize, int dataSize) { this.cacheFile = cacheFile; - this.startPosition = startPosition; + this.startPositionInCacheFile = startPosition; + this.startPositionOfOSFile = startPositionOfOSFile; this.metaSize = metaSize; this.dataSize = dataSize; } @@ -47,8 +61,8 @@ public class OSFileCacheValue { return cacheFile; } - public long getStartPosition() { - return startPosition; + public long getStartPositionInCacheFile() { + return startPositionInCacheFile; } public int getMetaSize() { @@ -65,6 +79,15 @@ public class OSFileCacheValue { return metaSize + dataSize; } + public boolean containsPosition(long position) { + return startPositionInCacheFile <= position && position < startPositionInCacheFile + dataSize; + } + + public int read(ByteBuffer dst, long startPosition) throws IOException { + long startPosInCacheFile = metaSize + (startPosition - this.startPositionOfOSFile); + return cacheFileChannel.read(dst, startPosInCacheFile); + } + /** Mark this value should be deleted, delete this value when no one is reading it. */ public synchronized void setShouldDelete() { this.shouldDelete = true; @@ -77,11 +100,14 @@ public class OSFileCacheValue { * Try to get the read lock, return false when this cache value should be deleted or has been * deleted. */ - public synchronized boolean tryReadLock() { + public synchronized boolean tryReadLock() throws IOException { if (shouldDelete || !cacheFile.exists()) { return false; } else { this.readCnt++; + if (!cacheFileChannel.isOpen()) { + cacheFileChannel = FileChannel.open(cacheFile.toPath(), StandardOpenOption.READ); + } return true; } } @@ -90,10 +116,18 @@ public class OSFileCacheValue { * Release the read lock, delete the cache value when no one else is reading it and this cache * value should be deleted. */ - public synchronized void readUnlock() { + public synchronized void readUnlock() throws IOException { this.readCnt--; + // delete the cache file only when no reference is used if (shouldDelete && readCnt == 0) { - cacheFile.delete(); + boolean success = cacheFile.delete(); + if (!success) { + logger.error("[OSFileCache] cannot delete cache file {}", cacheFile); + } + } + // close the file channel if no reference is used + if (readCnt == 0) { + cacheFileChannel.close(); } } } diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileChannel.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileChannel.java similarity index 67% rename from object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileChannel.java rename to object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileChannel.java index f094479037..5bc9bbe4eb 100644 --- a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileChannel.java +++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileChannel.java @@ -30,45 +30,37 @@ import java.io.Closeable; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; -import java.nio.file.StandardOpenOption; -public class CacheFileChannel implements Closeable { +public class OSFileChannel implements Closeable { private static final Logger logger = LoggerFactory.getLogger(OSTsFileInput.class); private static final ObjectStorageConfig config = ObjectStorageDescriptor.getInstance().getConfig(); private static final OSFileCache cache = OSFileCache.getInstance(); private final OSFile osFile; private long position = 0; - private OSFileCacheValue currentCacheFile; - private FileChannel cacheFileChannel; - private long startPositionInTsFile = position; - private long endPositionInTsFile = position + config.getCachePageSize(); + private OSFileCacheValue currentOSFileBlock; - public CacheFileChannel(OSFile osFile) { + public OSFileChannel(OSFile osFile) { this.osFile = osFile; } - public static InputStream newInputStream(CacheFileChannel channel) { + public static InputStream newInputStream(OSFileChannel channel) { return new CacheInputStream(channel); } - private boolean isPositionValid() { - return startPositionInTsFile <= position && position < endPositionInTsFile; + private boolean canReadFromCurrentCacheBlock(long startPos) { + return currentOSFileBlock != null && currentOSFileBlock.containsPosition(startPos); } private void openNextCacheFile() throws IOException { // close prev cache file - close(); + releaseCurrentOSFileBlock(); // open next cache file OSFileCacheKey key = locateCacheFileFromPosition(); - while (!currentCacheFile.tryReadLock()) { - currentCacheFile = cache.get(key); + // 用 while 是为了防止从 cache 中拿出来之后,对应的 value 又被挤出去,导致对应的文件被删除? + while (currentOSFileBlock == null || !currentOSFileBlock.tryReadLock()) { + currentOSFileBlock = cache.get(key); } - cacheFileChannel = - FileChannel.open(currentCacheFile.getCacheFile().toPath(), StandardOpenOption.READ); - startPositionInTsFile = currentCacheFile.getStartPosition(); - endPositionInTsFile = startPositionInTsFile + currentCacheFile.getLength(); } private OSFileCacheKey locateCacheFileFromPosition() { @@ -108,17 +100,15 @@ public class CacheFileChannel implements Closeable { // read each cache file int totalReadBytes = 0; while (startPos < endPos) { - if (!isPositionValid()) { + if (!canReadFromCurrentCacheBlock(startPos)) { openNextCacheFile(); } - long readStartPosition = currentCacheFile.getMetaSize() + (startPos - startPositionInTsFile); - long readEndPosition = - currentCacheFile.getMetaSize() - + (Math.min(endPos, endPositionInTsFile) - startPositionInTsFile); - int readSize = (int) (readEndPosition - readStartPosition); - dst.limit(dst.position() + readSize); - int read = cacheFileChannel.read(dst, readStartPosition); - if (read != readSize) { + + int maxReadSize = (int) Math.min(endPos - startPos, currentOSFileBlock.getDataSize()); + dst.limit(dst.position() + maxReadSize); + + int read = currentOSFileBlock.read(dst, startPos); + if (read != maxReadSize) { throw new IOException( String.format( "Cache file %s may crash because cannot read enough information in the cash file.", @@ -131,12 +121,14 @@ public class CacheFileChannel implements Closeable { return totalReadBytes; } + public void releaseCurrentOSFileBlock() throws IOException { + if (currentOSFileBlock != null) { + currentOSFileBlock.readUnlock(); + } + } + @Override public void close() throws IOException { - try { - cacheFileChannel.close(); - } finally { - currentCacheFile.readUnlock(); - } + releaseCurrentOSFileBlock(); } } diff --git a/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSTsFileInput.java b/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSTsFileInput.java index 07a85e75fe..0dcd600285 100644 --- a/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSTsFileInput.java +++ b/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSTsFileInput.java @@ -18,7 +18,7 @@ */ package org.apache.iotdb.os.fileSystem; -import org.apache.iotdb.os.cache.CacheFileChannel; +import org.apache.iotdb.os.cache.OSFileChannel; import org.apache.iotdb.tsfile.read.reader.TsFileInput; import java.io.IOException; @@ -27,11 +27,11 @@ import java.nio.ByteBuffer; public class OSTsFileInput implements TsFileInput { private OSFile file; - private CacheFileChannel channel; + private OSFileChannel channel; public OSTsFileInput(OSFile file) { this.file = file; - this.channel = new CacheFileChannel(file); + this.channel = new OSFileChannel(file); } @Override @@ -62,7 +62,7 @@ public class OSTsFileInput implements TsFileInput { @Override public InputStream wrapAsInputStream() throws IOException { - return CacheFileChannel.newInputStream(channel); + return OSFileChannel.newInputStream(channel); } @Override
