This is an automated email from the ASF dual-hosted git repository.
heiming pushed a commit to branch tiered_storage
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/tiered_storage by this push:
new 470525e81c add cache test
470525e81c is described below
commit 470525e81c9aba6ba7f31479f1e87661ce9b86a2
Author: HeimingZ <[email protected]>
AuthorDate: Thu May 18 17:27:54 2023 +0800
add cache test
---
object-storage/pom.xml | 5 +
.../apache/iotdb/os/cache/CacheFileManager.java | 46 +++++---
.../org/apache/iotdb/os/cache/OSFileCache.java | 26 ++---
.../org/apache/iotdb/os/cache/OSFileChannel.java | 5 +-
.../apache/iotdb/os/conf/ObjectStorageConfig.java | 14 ++-
.../org/apache/iotdb/os/fileSystem/OSFile.java | 44 ++++----
.../iotdb/os/io/aws/S3ObjectStorageConnector.java | 14 +--
.../apache/iotdb/os/utils/ObjectStorageType.java | 17 ++-
.../org/apache/iotdb/os/cache/OSFileCacheTest.java | 118 +++++++++++++++++++++
.../apache/iotdb/os/cache/OSFileChannelTest.java} | 24 ++++-
.../apache/iotdb/os/cache/OSInputStreamTest.java} | 24 ++++-
11 files changed, 260 insertions(+), 77 deletions(-)
diff --git a/object-storage/pom.xml b/object-storage/pom.xml
index 8c4a4e2aaf..c2906dce67 100644
--- a/object-storage/pom.xml
+++ b/object-storage/pom.xml
@@ -61,6 +61,11 @@
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git
a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileManager.java
b/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileManager.java
index 7497f15a99..e2910f0522 100644
---
a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileManager.java
+++
b/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileManager.java
@@ -39,11 +39,10 @@ public class CacheFileManager {
private static final Logger logger =
LoggerFactory.getLogger(CacheFileManager.class);
private static final ObjectStorageConfig config =
ObjectStorageDescriptor.getInstance().getConfig();
- private final String[] cacheDirs = config.getCacheDirs();
private final AtomicLong cacheFileId = new AtomicLong(0);
private CacheFileManager() {
- for (String cacheDir : cacheDirs) {
+ for (String cacheDir : config.getCacheDirs()) {
File cacheDirFile = new File(cacheDir);
if (!cacheDirFile.exists()) {
cacheDirFile.mkdirs();
@@ -56,33 +55,48 @@ public class CacheFileManager {
}
private File getTmpCacheFile(long id) {
- long dirId = id % cacheDirs.length;
- return new File(cacheDirs[(int) dirId], id + TMP_CACHE_FILE_SUFFIX);
+ long dirId = id % config.getCacheDirs().length;
+ return new File(config.getCacheDirs()[(int) dirId], id +
TMP_CACHE_FILE_SUFFIX);
}
private File getCacheFile(long id) {
- long dirId = id % cacheDirs.length;
- return new File(cacheDirs[(int) dirId], id + CACHE_FILE_SUFFIX);
+ long dirId = id % config.getCacheDirs().length;
+ return new File(config.getCacheDirs()[(int) dirId], id +
CACHE_FILE_SUFFIX);
}
/** Persist data, return null when failing to persist data */
public OSFileCacheValue persist(OSFileCacheKey key, byte[] data) {
- OSFileCacheValue res = null;
- long cacheFileId = getNextCacheFileId();
- File tmpCacheFile = getTmpCacheFile(cacheFileId);
- try (FileChannel channel =
- FileChannel.open(tmpCacheFile.toPath(),
StandardOpenOption.CREATE_NEW)) {
+ long cacheFileId;
+ File tmpCacheFile;
+ // create new tmp cache file
+ try {
+ do {
+ cacheFileId = getNextCacheFileId();
+ tmpCacheFile = getTmpCacheFile(cacheFileId);
+ } while (!tmpCacheFile.createNewFile());
+ } catch (IOException e) {
+ logger.error("Fail to create cache file.", e);
+ return null;
+ }
+ // write value into tmp cache file
+ try (FileChannel channel = FileChannel.open(tmpCacheFile.toPath(),
StandardOpenOption.WRITE)) {
ByteBuffer meta = key.serialize();
+ meta.flip();
channel.write(meta);
channel.write(ByteBuffer.wrap(data));
- res =
- new OSFileCacheValue(
- tmpCacheFile, 0, meta.capacity(), data.length,
key.getStartPosition());
} catch (IOException e) {
- logger.error("Fail to persist data to cache file {}", tmpCacheFile, e);
+ logger.error("Fail to persist data to cache file {}.", tmpCacheFile, e);
tmpCacheFile.delete();
+ return null;
+ }
+ // rename tmp file to cache file
+ File cacheFile = getCacheFile(cacheFileId);
+ if (tmpCacheFile.renameTo(cacheFile)) {
+ return new OSFileCacheValue(
+ cacheFile, 0, key.serializeSize(), data.length,
key.getStartPosition());
+ } else {
+ return null;
}
- return tmpCacheFile.renameTo(getCacheFile(cacheFileId)) ? res : null;
}
/** This method is used by the recover procedure */
diff --git
a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCache.java
b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCache.java
index 91b60791a3..ade8ce98a7 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCache.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCache.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.os.cache;
import org.apache.iotdb.os.conf.ObjectStorageConfig;
import org.apache.iotdb.os.conf.ObjectStorageDescriptor;
import org.apache.iotdb.os.io.ObjectStorageConnector;
-import org.apache.iotdb.os.io.aws.S3ObjectStorageConnector;
+import org.apache.iotdb.os.utils.ObjectStorageType;
import com.github.benmanes.caffeine.cache.CacheLoader;
import com.github.benmanes.caffeine.cache.Caffeine;
@@ -37,27 +37,18 @@ public class OSFileCache {
private static final Logger logger =
LoggerFactory.getLogger(OSFileCache.class);
private static final ObjectStorageConfig config =
ObjectStorageDescriptor.getInstance().getConfig();
- private static final ObjectStorageConnector connector;
-
- static {
- switch (config.getOsType()) {
- case AWS_S3:
- connector = new S3ObjectStorageConnector();
- break;
- default:
- connector = null;
- }
- }
+ private ObjectStorageConnector connector;
+ /** manage all io operations to the cache files */
+ private final CacheFileManager cacheFileManager =
CacheFileManager.getInstance();
/**
* persistent LRU cache for remote TsFile, value is loaded successfully when
it has been stored on
* the disk
*/
private final LoadingCache<OSFileCacheKey, OSFileCacheValue>
remotePos2LocalCacheFile;
- /** manage all io operations to the cache files */
- private final CacheFileManager cacheFileManager =
CacheFileManager.getInstance();
- private OSFileCache() {
+ OSFileCache() {
+ connector = ObjectStorageType.getConnector();
remotePos2LocalCacheFile =
Caffeine.newBuilder()
.maximumWeight(config.getCacheMaxDiskUsage())
@@ -80,6 +71,11 @@ public class OSFileCache {
remotePos2LocalCacheFile.put(key, value);
}
+ // test only
+ void setConnector(ObjectStorageConnector connector) {
+ this.connector = connector;
+ }
+
class OSFileCacheLoader implements CacheLoader<OSFileCacheKey,
OSFileCacheValue> {
@Override
public @Nullable OSFileCacheValue load(@NonNull OSFileCacheKey key) throws
Exception {
diff --git
a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileChannel.java
b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileChannel.java
index b01e9c6a01..342a91836c 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileChannel.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileChannel.java
@@ -92,9 +92,6 @@ public class OSFileChannel implements Closeable {
int currentPosition = (int) position;
dst.mark();
int dstLimit = dst.limit();
- // read each cache file
- int totalReadBytes = 0;
-
// determiner the ead range
long startPos = position;
long endPos = position + dst.remaining();
@@ -104,6 +101,8 @@ public class OSFileChannel implements Closeable {
if (endPos > size()) {
endPos = size();
}
+ // read each cache file
+ int totalReadBytes = 0;
try {
while (startPos < endPos) {
if (currentOSFileBlock == null ||
!currentOSFileBlock.canRead(startPos)) {
diff --git
a/object-storage/src/main/java/org/apache/iotdb/os/conf/ObjectStorageConfig.java
b/object-storage/src/main/java/org/apache/iotdb/os/conf/ObjectStorageConfig.java
index 2935c641e4..2c41e7c869 100644
---
a/object-storage/src/main/java/org/apache/iotdb/os/conf/ObjectStorageConfig.java
+++
b/object-storage/src/main/java/org/apache/iotdb/os/conf/ObjectStorageConfig.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.os.utils.ObjectStorageType;
import java.io.File;
public class ObjectStorageConfig {
- private ObjectStorageType osType = ObjectStorageType.AWS_S3;
+ private ObjectStorageType osType = ObjectStorageType.TEST;
private AWSS3Config awss3Config = new AWSS3Config();
@@ -42,10 +42,18 @@ public class ObjectStorageConfig {
return osType;
}
+ public void setOsType(ObjectStorageType osType) {
+ this.osType = osType;
+ }
+
public String[] getCacheDirs() {
return cacheDirs;
}
+ public void setCacheDirs(String[] cacheDirs) {
+ this.cacheDirs = cacheDirs;
+ }
+
public long getCacheMaxDiskUsage() {
return cacheMaxDiskUsage;
}
@@ -53,4 +61,8 @@ public class ObjectStorageConfig {
public int getCachePageSize() {
return cachePageSize;
}
+
+ public void setCachePageSize(int cachePageSize) {
+ this.cachePageSize = cachePageSize;
+ }
}
diff --git
a/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSFile.java
b/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSFile.java
index 9f61ef431d..d5c9fa060c 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSFile.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSFile.java
@@ -22,7 +22,7 @@ import org.apache.iotdb.os.conf.ObjectStorageConfig;
import org.apache.iotdb.os.conf.ObjectStorageDescriptor;
import org.apache.iotdb.os.exception.ObjectStorageException;
import org.apache.iotdb.os.io.ObjectStorageConnector;
-import org.apache.iotdb.os.io.aws.S3ObjectStorageConnector;
+import org.apache.iotdb.os.utils.ObjectStorageType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,43 +50,35 @@ public class OSFile extends File {
"Current object storage file doesn't support this operation.";
private static final ObjectStorageConfig config =
ObjectStorageDescriptor.getInstance().getConfig();
- private static final ObjectStorageConnector connector;
-
- static {
- switch (config.getOsType()) {
- case AWS_S3:
- connector = new S3ObjectStorageConnector();
- break;
- default:
- connector = null;
- }
- }
+ private ObjectStorageConnector connector;
private final OSURI osUri;
private long length = 0L;
+ public OSFile(OSURI osUri) {
+ super(osUri.getURI().toString());
+ this.osUri = osUri;
+ connector = ObjectStorageType.getConnector();
+ }
+
public OSFile(String pathname) {
- super(pathname);
- this.osUri = new OSURI(pathname);
+ this(new OSURI(pathname));
}
public OSFile(String parent, String child) {
- super(parent, child);
- this.osUri = new OSURI(concatPath(parent, child));
+ this(new OSURI(concatPath(parent, child)));
}
public OSFile(File parent, String child) {
- super(parent, child);
- this.osUri = new OSURI(concatPath(parent.toString(), child));
+ this(new OSURI(concatPath(parent.toString(), child)));
}
public OSFile(URI uri) {
- super(uri);
- this.osUri = new OSURI(uri);
+ this(new OSURI(uri));
}
- private String concatPath(String parent, String child) {
+ private static String concatPath(String parent, String child) {
if (parent.endsWith(FILE_SEPARATOR)) {
return parent + child;
} else {
@@ -94,11 +86,6 @@ public class OSFile extends File {
}
}
- public OSFile(OSURI osUri) {
- super(osUri.getURI());
- this.osUri = osUri;
- }
-
@Override
public String getName() {
return osUri.getKey();
@@ -435,4 +422,9 @@ public class OSFile extends File {
return null;
}
}
+
+ // test only
+ public void setConnector(ObjectStorageConnector connector) {
+ this.connector = connector;
+ }
}
diff --git
a/object-storage/src/main/java/org/apache/iotdb/os/io/aws/S3ObjectStorageConnector.java
b/object-storage/src/main/java/org/apache/iotdb/os/io/aws/S3ObjectStorageConnector.java
index 699ffda13d..2c5d306086 100644
---
a/object-storage/src/main/java/org/apache/iotdb/os/io/aws/S3ObjectStorageConnector.java
+++
b/object-storage/src/main/java/org/apache/iotdb/os/io/aws/S3ObjectStorageConnector.java
@@ -45,15 +45,11 @@ import java.io.InputStream;
public class S3ObjectStorageConnector implements ObjectStorageConnector {
private static final String RANGE_FORMAT = "%d-%d";
- private S3Client s3Client;
-
- public S3ObjectStorageConnector() {
- s3Client =
- S3Client.builder()
- .region(Region.of(AWSS3Config.getRegion()))
- .credentialsProvider(AWSS3Config.getCredentialProvider())
- .build();
- }
+ private static final S3Client s3Client =
+ S3Client.builder()
+ .region(Region.of(AWSS3Config.getRegion()))
+ .credentialsProvider(AWSS3Config.getCredentialProvider())
+ .build();
@Override
public boolean doesObjectExist(OSURI osUri) throws ObjectStorageException {
diff --git
a/object-storage/src/main/java/org/apache/iotdb/os/utils/ObjectStorageType.java
b/object-storage/src/main/java/org/apache/iotdb/os/utils/ObjectStorageType.java
index 0a3b2f578d..74a2ab0055 100644
---
a/object-storage/src/main/java/org/apache/iotdb/os/utils/ObjectStorageType.java
+++
b/object-storage/src/main/java/org/apache/iotdb/os/utils/ObjectStorageType.java
@@ -18,6 +18,21 @@
*/
package org.apache.iotdb.os.utils;
+import org.apache.iotdb.os.conf.ObjectStorageDescriptor;
+import org.apache.iotdb.os.io.ObjectStorageConnector;
+import org.apache.iotdb.os.io.aws.S3ObjectStorageConnector;
+
public enum ObjectStorageType {
- AWS_S3,
+ TEST,
+ AWS_S3;
+
+ public static ObjectStorageConnector getConnector() {
+ switch (ObjectStorageDescriptor.getInstance().getConfig().getOsType()) {
+ case AWS_S3:
+ return new S3ObjectStorageConnector();
+ case TEST:
+ default:
+ return null;
+ }
+ }
}
diff --git
a/object-storage/src/test/java/org/apache/iotdb/os/cache/OSFileCacheTest.java
b/object-storage/src/test/java/org/apache/iotdb/os/cache/OSFileCacheTest.java
new file mode 100644
index 0000000000..7ddcd56050
--- /dev/null
+++
b/object-storage/src/test/java/org/apache/iotdb/os/cache/OSFileCacheTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.os.cache;
+
+import org.apache.iotdb.os.conf.ObjectStorageConfig;
+import org.apache.iotdb.os.conf.ObjectStorageDescriptor;
+import org.apache.iotdb.os.fileSystem.OSFile;
+import org.apache.iotdb.os.fileSystem.OSURI;
+import org.apache.iotdb.os.io.ObjectStorageConnector;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.testcontainers.shaded.org.apache.commons.io.FileUtils;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class OSFileCacheTest {
+ private static final File cacheDir = new File("target" + File.separator +
"cache");
+ private static final ObjectStorageConfig config =
+ ObjectStorageDescriptor.getInstance().getConfig();
+ @InjectMocks private OSFileCache cache = new OSFileCache();
+ @InjectMocks private OSFile testFile = new OSFile(new OSURI("test_bucket",
"test_key"));
+ @Mock private ObjectStorageConnector connector;
+ private int prevCachePageSize;
+ private String[] prevCacheDirs;
+
+ @Before
+ public void setUp() throws Exception {
+ prevCachePageSize = config.getCachePageSize();
+ config.setCachePageSize(100);
+ prevCacheDirs = config.getCacheDirs();
+ config.setCacheDirs(new String[] {cacheDir.getPath()});
+ cacheDir.mkdirs();
+ CacheFileManager.getInstance().setCacheFileId(0);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ config.setCachePageSize(prevCachePageSize);
+ config.setCacheDirs(prevCacheDirs);
+ FileUtils.deleteDirectory(cacheDir);
+ }
+
+ @Test
+ public void get() throws Exception {
+ // pull 0-100, and 0-100 exists
+ byte[] bytes0to100 = new byte[100];
+ for (int i = 0; i < 100; ++i) {
+ bytes0to100[i] = (byte) i;
+ }
+ when(connector.getRemoteFile(testFile.toOSURI(), 0,
100)).thenReturn(bytes0to100);
+ OSFileCacheKey key0to100 = new OSFileCacheKey(testFile, 0);
+ OSFileCacheValue value0to100 = cache.get(key0to100);
+ assertTrue(value0to100.getCacheFile().exists());
+ assertEquals(value0to100.getCacheFile().length(), value0to100.getLength());
+ try (FileChannel channel =
+ FileChannel.open(value0to100.getCacheFile().toPath(),
StandardOpenOption.READ)) {
+ byte[] readBytes = new byte[100];
+ ByteBuffer buffer = ByteBuffer.wrap(readBytes);
+ int bytesNum =
+ channel.read(
+ buffer, value0to100.getStartPositionInCacheFile() +
value0to100.getMetaSize());
+ assertEquals(100, bytesNum);
+ assertEquals(100, value0to100.getDataSize());
+ assertArrayEquals(bytes0to100, readBytes);
+ }
+ // pull 100-200, but only 100-150 exists
+ byte[] bytes100to150 = new byte[150 - 100];
+ for (int i = 0; i < 150 - 100; ++i) {
+ bytes0to100[i] = (byte) i;
+ }
+ when(connector.getRemoteFile(testFile.toOSURI(), 100,
100)).thenReturn(bytes100to150);
+ OSFileCacheKey key100to150 = new OSFileCacheKey(testFile, 100);
+ OSFileCacheValue value100to150 = cache.get(key100to150);
+ assertTrue(value100to150.getCacheFile().exists());
+ try (FileChannel channel =
+ FileChannel.open(value100to150.getCacheFile().toPath(),
StandardOpenOption.READ)) {
+ byte[] readBytes = new byte[50];
+ ByteBuffer buffer = ByteBuffer.wrap(readBytes);
+ int bytesNum =
+ channel.read(
+ buffer, value100to150.getStartPositionInCacheFile() +
value100to150.getMetaSize());
+ assertEquals(50, bytesNum);
+ assertEquals(50, value100to150.getDataSize());
+ assertArrayEquals(bytes100to150, readBytes);
+ }
+ }
+}
diff --git
a/object-storage/src/main/java/org/apache/iotdb/os/utils/ObjectStorageType.java
b/object-storage/src/test/java/org/apache/iotdb/os/cache/OSFileChannelTest.java
similarity index 72%
copy from
object-storage/src/main/java/org/apache/iotdb/os/utils/ObjectStorageType.java
copy to
object-storage/src/test/java/org/apache/iotdb/os/cache/OSFileChannelTest.java
index 0a3b2f578d..fca7fc5caa 100644
---
a/object-storage/src/main/java/org/apache/iotdb/os/utils/ObjectStorageType.java
+++
b/object-storage/src/test/java/org/apache/iotdb/os/cache/OSFileChannelTest.java
@@ -16,8 +16,26 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.os.utils;
+package org.apache.iotdb.os.cache;
-public enum ObjectStorageType {
- AWS_S3,
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class OSFileChannelTest {
+
+ @Test
+ public void size() {}
+
+ @Test
+ public void position() {}
+
+ @Test
+ public void testPosition() {}
+
+ @Test
+ public void read() {}
+
+ @Test
+ public void testRead() {}
}
diff --git
a/object-storage/src/main/java/org/apache/iotdb/os/utils/ObjectStorageType.java
b/object-storage/src/test/java/org/apache/iotdb/os/cache/OSInputStreamTest.java
similarity index 72%
copy from
object-storage/src/main/java/org/apache/iotdb/os/utils/ObjectStorageType.java
copy to
object-storage/src/test/java/org/apache/iotdb/os/cache/OSInputStreamTest.java
index 0a3b2f578d..54852aa8df 100644
---
a/object-storage/src/main/java/org/apache/iotdb/os/utils/ObjectStorageType.java
+++
b/object-storage/src/test/java/org/apache/iotdb/os/cache/OSInputStreamTest.java
@@ -16,8 +16,26 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.os.utils;
+package org.apache.iotdb.os.cache;
-public enum ObjectStorageType {
- AWS_S3,
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class OSInputStreamTest {
+
+ @Test
+ public void read() {}
+
+ @Test
+ public void testRead() {}
+
+ @Test
+ public void testRead1() {}
+
+ @Test
+ public void skip() {}
+
+ @Test
+ public void available() {}
}