This is an automated email from the ASF dual-hosted git repository.
heiming pushed a commit to branch tiered_storage
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/tiered_storage by this push:
new 2a85d48be9a add test for cache channel and input stream
2a85d48be9a is described below
commit 2a85d48be9afc80e02d317eac3e872c2ad86f0ec
Author: HeimingZ <[email protected]>
AuthorDate: Fri May 19 15:40:53 2023 +0800
add test for cache channel and input stream
---
.../org/apache/iotdb/os/cache/OSFileChannel.java | 9 +-
.../org/apache/iotdb/os/cache/OSInputStream.java | 17 ++-
.../org/apache/iotdb/os/cache/OSFileCacheTest.java | 4 +-
.../apache/iotdb/os/cache/OSFileChannelTest.java | 159 ++++++++++++++++++++-
.../apache/iotdb/os/cache/OSInputStreamTest.java | 137 +++++++++++++++++-
5 files changed, 303 insertions(+), 23 deletions(-)
diff --git
a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileChannel.java
b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileChannel.java
index 342a91836ca..db0649c3374 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileChannel.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileChannel.java
@@ -37,13 +37,18 @@ public class OSFileChannel implements Closeable {
private static final Logger logger =
LoggerFactory.getLogger(OSTsFileInput.class);
private static final ObjectStorageConfig config =
ObjectStorageDescriptor.getInstance().getConfig();
- private static final OSFileCache cache = OSFileCache.getInstance();
+ private final OSFileCache cache;
private final OSFile osFile;
private long position = 0;
private OSFileBlock currentOSFileBlock;
public OSFileChannel(OSFile osFile) throws IOException {
+ this(osFile, OSFileCache.getInstance());
+ }
+
+ OSFileChannel(OSFile osFile, OSFileCache cache) throws IOException {
+ this.cache = cache;
this.osFile = osFile;
}
@@ -133,7 +138,7 @@ public class OSFileChannel implements Closeable {
closeCurrentOSFileBlock();
}
- private static class OSFileBlock {
+ private class OSFileBlock {
private OSFileCacheValue cacheValue;
private FileChannel fileChannel;
diff --git
a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSInputStream.java
b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSInputStream.java
index a972c32a7cb..68166d0ce45 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSInputStream.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSInputStream.java
@@ -31,7 +31,7 @@ public class OSInputStream extends InputStream {
}
@Override
- public int read() throws IOException {
+ public synchronized int read() throws IOException {
byte[] b1 = new byte[1];
int n = read(b1);
if (n == 1) return b1[0] & 0xff;
@@ -39,12 +39,15 @@ public class OSInputStream extends InputStream {
}
@Override
- public int read(byte[] b) throws IOException {
+ public synchronized int read(byte[] b) throws IOException {
return read(b, 0, b.length);
}
@Override
- public int read(byte[] b, int off, int len) throws IOException {
+ public synchronized int read(byte[] b, int off, int len) throws IOException {
+ if (off < 0 || len < 0 || len > b.length - off) {
+ throw new IndexOutOfBoundsException();
+ }
if (len == 0) {
return 0;
}
@@ -55,7 +58,7 @@ public class OSInputStream extends InputStream {
}
@Override
- public long skip(long n) throws IOException {
+ public synchronized long skip(long n) throws IOException {
if (n <= 0) {
return 0;
}
@@ -67,12 +70,12 @@ public class OSInputStream extends InputStream {
}
@Override
- public int available() throws IOException {
+ public synchronized int available() throws IOException {
return (int) (channel.size() - channel.position());
}
@Override
- public void close() throws IOException {
+ public synchronized void close() throws IOException {
channel.close();
}
@@ -87,7 +90,7 @@ public class OSInputStream extends InputStream {
}
@Override
- public boolean markSupported() {
+ public synchronized boolean markSupported() {
throw new UnsupportedOperationException();
}
}
diff --git
a/object-storage/src/test/java/org/apache/iotdb/os/cache/OSFileCacheTest.java
b/object-storage/src/test/java/org/apache/iotdb/os/cache/OSFileCacheTest.java
index 51b8e351755..0691687e3d7 100644
---
a/object-storage/src/test/java/org/apache/iotdb/os/cache/OSFileCacheTest.java
+++
b/object-storage/src/test/java/org/apache/iotdb/os/cache/OSFileCacheTest.java
@@ -77,7 +77,7 @@ public class OSFileCacheTest {
}
@Test
- public void get() throws Exception {
+ public void testGet() throws Exception {
// pull 0-100, and 0-100 exists
byte[] bytes0to100 = new byte[100];
for (int i = 0; i < 100; ++i) {
@@ -102,7 +102,7 @@ public class OSFileCacheTest {
// pull 100-200, but only 100-150 exists
byte[] bytes100to150 = new byte[150 - 100];
for (int i = 0; i < 150 - 100; ++i) {
- bytes0to100[i] = (byte) i;
+ bytes100to150[i] = (byte) i;
}
when(connector.getRemoteFile(testFile.toOSURI(), 100,
100)).thenReturn(bytes100to150);
OSFileCacheKey key100to150 = new OSFileCacheKey(testFile, 100);
diff --git
a/object-storage/src/test/java/org/apache/iotdb/os/cache/OSFileChannelTest.java
b/object-storage/src/test/java/org/apache/iotdb/os/cache/OSFileChannelTest.java
index fca7fc5caa6..bb09faa2de9 100644
---
a/object-storage/src/test/java/org/apache/iotdb/os/cache/OSFileChannelTest.java
+++
b/object-storage/src/test/java/org/apache/iotdb/os/cache/OSFileChannelTest.java
@@ -18,24 +18,171 @@
*/
package org.apache.iotdb.os.cache;
+import org.apache.iotdb.os.conf.ObjectStorageConfig;
+import org.apache.iotdb.os.conf.ObjectStorageDescriptor;
+import org.apache.iotdb.os.exception.ObjectStorageException;
+import org.apache.iotdb.os.fileSystem.OSFile;
+import org.apache.iotdb.os.fileSystem.OSURI;
+import org.apache.iotdb.os.io.ObjectStorageConnector;
+import org.apache.iotdb.os.io.aws.S3MetaData;
+import org.apache.iotdb.os.utils.ObjectStorageType;
+
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.testcontainers.shaded.org.apache.commons.io.FileUtils;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import static org.junit.Assert.*;
+import static org.mockito.Mockito.when;
+@RunWith(MockitoJUnitRunner.class)
public class OSFileChannelTest {
+ private static final File cacheDir = new File("target" + File.separator +
"cache");
+ private static final ObjectStorageConfig config =
+ ObjectStorageDescriptor.getInstance().getConfig();
+ private static final int cachePageSize = 100;
+ @InjectMocks private OSFileCache cache = new
OSFileCache(ObjectStorageType.TEST);
- @Test
- public void size() {}
+ @InjectMocks
+ private OSFile testFile =
+ new OSFile(new OSURI("test_bucket", "test_key"), ObjectStorageType.TEST);
+
+ @Mock private ObjectStorageConnector connector;
+ private int prevCachePageSize;
+ private String[] prevCacheDirs;
+
+ @Before
+ public void setUp() throws Exception {
+ prevCachePageSize = config.getCachePageSize();
+ config.setCachePageSize(cachePageSize);
+ prevCacheDirs = config.getCacheDirs();
+ config.setCacheDirs(new String[] {cacheDir.getPath()});
+ cacheDir.mkdirs();
+ CacheFileManager.getInstance().setCacheFileId(0);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ config.setCachePageSize(prevCachePageSize);
+ config.setCacheDirs(prevCacheDirs);
+ FileUtils.deleteDirectory(cacheDir);
+ }
+
+ private void prepareOSFile(int size) throws ObjectStorageException {
+ when(connector.getMetaData(testFile.toOSURI()))
+ .thenReturn(new S3MetaData(size, System.currentTimeMillis()));
+ int startPos = 0;
+ while (startPos < size) {
+ byte[] bytes = new byte[(int) Math.min(cachePageSize, size - startPos)];
+ for (int i = 0; i < bytes.length; ++i) {
+ bytes[i] = (byte) (startPos + i);
+ }
+ when(connector.getRemoteFile(testFile.toOSURI(), startPos,
cachePageSize)).thenReturn(bytes);
+ startPos += cachePageSize;
+ }
+ }
@Test
- public void position() {}
+ public void testReadFileSmallerThanPageSize() throws Exception {
+ int size = 50;
+ prepareOSFile(size);
+ try (OSFileChannel channel = new OSFileChannel(testFile, cache)) {
+ ByteBuffer dst = ByteBuffer.allocate(cachePageSize);
+ assertEquals(size, channel.read(dst));
+ dst.flip();
+ for (int i = 0; i < size; ++i) {
+ assertEquals((byte) i, dst.get());
+ }
+ }
+ }
@Test
- public void testPosition() {}
+ public void testReadFileBiggerThanPageSize() throws Exception {
+ int size = 550;
+ prepareOSFile(size);
+ try (OSFileChannel channel = new OSFileChannel(testFile, cache)) {
+ ByteBuffer dst = ByteBuffer.allocate(size);
+ assertEquals(size, channel.read(dst));
+ dst.flip();
+ for (int i = 0; i < size; ++i) {
+ assertEquals((byte) i, dst.get());
+ }
+ }
+ }
@Test
- public void read() {}
+ public void testReadByPosition() throws Exception {
+ int size = 550;
+ prepareOSFile(size);
+ try (OSFileChannel channel = new OSFileChannel(testFile, cache)) {
+ int position = 0;
+ while (position < size) {
+ ByteBuffer dst = ByteBuffer.allocate(50);
+ assertEquals(50, channel.read(dst, position));
+ dst.flip();
+ for (int i = 0; i < 50; ++i) {
+ assertEquals((byte) (position + i), dst.get());
+ }
+ position += 50;
+ }
+ }
+ }
@Test
- public void testRead() {}
+ public void testConcurrentRead() throws Exception {
+ int size = 550;
+ prepareOSFile(size);
+ try (OSFileChannel channel = new OSFileChannel(testFile, cache)) {
+ // random read by
+ int threadsNum = 3;
+ ExecutorService executorService =
Executors.newFixedThreadPool(threadsNum);
+ List<Future<Void>> futures = new ArrayList<>();
+ for (int i = 0; i < threadsNum * 3; ++i) {
+ Callable<Void> readTask =
+ () -> {
+ int position = 0;
+ while (position < size) {
+ ByteBuffer dst = ByteBuffer.allocate(50);
+ assertEquals(50, channel.read(dst, position));
+ dst.flip();
+ for (int j = 0; j < 50; ++j) {
+ assertEquals((byte) (position + j), dst.get());
+ }
+ position += 50;
+ }
+ return null;
+ };
+ Future<Void> future = executorService.submit(readTask);
+ futures.add(future);
+ }
+ // sequence read
+ int position = 0;
+ while (position < size) {
+ ByteBuffer dst = ByteBuffer.allocate(50);
+ assertEquals(50, channel.read(dst));
+ dst.flip();
+ for (int i = 0; i < 50; ++i) {
+ assertEquals((byte) (position + i), dst.get());
+ }
+ position += 50;
+ }
+ // wait all random read tasks end
+ for (Future<Void> future : futures) {
+ future.get();
+ }
+ }
+ }
}
diff --git
a/object-storage/src/test/java/org/apache/iotdb/os/cache/OSInputStreamTest.java
b/object-storage/src/test/java/org/apache/iotdb/os/cache/OSInputStreamTest.java
index 54852aa8df6..21741294ae5 100644
---
a/object-storage/src/test/java/org/apache/iotdb/os/cache/OSInputStreamTest.java
+++
b/object-storage/src/test/java/org/apache/iotdb/os/cache/OSInputStreamTest.java
@@ -18,24 +18,149 @@
*/
package org.apache.iotdb.os.cache;
+import org.apache.iotdb.os.conf.ObjectStorageConfig;
+import org.apache.iotdb.os.conf.ObjectStorageDescriptor;
+import org.apache.iotdb.os.exception.ObjectStorageException;
+import org.apache.iotdb.os.fileSystem.OSFile;
+import org.apache.iotdb.os.fileSystem.OSURI;
+import org.apache.iotdb.os.io.ObjectStorageConnector;
+import org.apache.iotdb.os.io.aws.S3MetaData;
+import org.apache.iotdb.os.utils.ObjectStorageType;
+
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.testcontainers.shaded.org.apache.commons.io.FileUtils;
+
+import java.io.File;
+import java.io.InputStream;
import static org.junit.Assert.*;
+import static org.mockito.Mockito.when;
+@RunWith(MockitoJUnitRunner.class)
public class OSInputStreamTest {
+ private static final File cacheDir = new File("target" + File.separator +
"cache");
+ private static final ObjectStorageConfig config =
+ ObjectStorageDescriptor.getInstance().getConfig();
+ private static final int cachePageSize = 100;
+ @InjectMocks private OSFileCache cache = new
OSFileCache(ObjectStorageType.TEST);
- @Test
- public void read() {}
+ @InjectMocks
+ private OSFile testFile =
+ new OSFile(new OSURI("test_bucket", "test_key"), ObjectStorageType.TEST);
+
+ @Mock private ObjectStorageConnector connector;
+ private int prevCachePageSize;
+ private String[] prevCacheDirs;
+
+ @Before
+ public void setUp() throws Exception {
+ prevCachePageSize = config.getCachePageSize();
+ config.setCachePageSize(cachePageSize);
+ prevCacheDirs = config.getCacheDirs();
+ config.setCacheDirs(new String[] {cacheDir.getPath()});
+ cacheDir.mkdirs();
+ CacheFileManager.getInstance().setCacheFileId(0);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ config.setCachePageSize(prevCachePageSize);
+ config.setCacheDirs(prevCacheDirs);
+ FileUtils.deleteDirectory(cacheDir);
+ }
+
+ private void prepareOSFile(int size) throws ObjectStorageException {
+ when(connector.getMetaData(testFile.toOSURI()))
+ .thenReturn(new S3MetaData(size, System.currentTimeMillis()));
+ int startPos = 0;
+ while (startPos < size) {
+ byte[] bytes = new byte[(int) Math.min(cachePageSize, size - startPos)];
+ for (int i = 0; i < bytes.length; ++i) {
+ bytes[i] = (byte) (startPos + i);
+ }
+ when(connector.getRemoteFile(testFile.toOSURI(), startPos,
cachePageSize)).thenReturn(bytes);
+ startPos += cachePageSize;
+ }
+ }
@Test
- public void testRead() {}
+ public void testReadByte() throws Exception {
+ int size = 50;
+ prepareOSFile(size);
+ try (OSFileChannel channel = new OSFileChannel(testFile, cache);
+ InputStream in = OSFileChannel.newInputStream(channel)) {
+ for (int i = 0; in.available() > 0; ++i) {
+ assertEquals((byte) i, in.read());
+ }
+ assertEquals(0, in.available());
+ }
+ }
@Test
- public void testRead1() {}
+ public void testReadByteArray() throws Exception {
+ int size = 128;
+ prepareOSFile(size);
+ try (OSFileChannel channel = new OSFileChannel(testFile, cache);
+ InputStream in = OSFileChannel.newInputStream(channel)) {
+ int position = 0;
+ while (position < size) {
+ byte[] bytes = new byte[50];
+ assertEquals(Math.min(50, size - position), in.read(bytes));
+ for (int i = 0; i < Math.min(50, size - position); ++i) {
+ assertEquals((byte) (position + i), bytes[i]);
+ }
+ position += 50;
+ }
+ assertEquals(0, in.available());
+ }
+ }
@Test
- public void skip() {}
+ public void testReadByteArrayByLen() throws Exception {
+ int size = 550;
+ prepareOSFile(size);
+ try (OSFileChannel channel = new OSFileChannel(testFile, cache);
+ InputStream in = OSFileChannel.newInputStream(channel)) {
+ byte[] bytes = new byte[size];
+ int position = 0;
+ while (position < size) {
+ assertEquals(50, in.read(bytes, position, 50));
+ for (int i = 0; i < 50; ++i) {
+ assertEquals((byte) (position + i), bytes[position + i]);
+ }
+ position += 50;
+ }
+ assertEquals(0, in.available());
+ }
+ }
@Test
- public void available() {}
+ public void testSkip() throws Exception {
+ int size = 550;
+ prepareOSFile(size);
+ try (OSFileChannel channel = new OSFileChannel(testFile, cache);
+ InputStream in = OSFileChannel.newInputStream(channel)) {
+ byte[] bytes = new byte[size];
+ int position = 0;
+ while (position < size) {
+ if (position % 100 == 0) {
+ in.skip(50);
+ position += 50;
+ continue;
+ }
+ assertEquals(50, in.read(bytes, position, 50));
+ for (int i = 0; i < 50; ++i) {
+ assertEquals((byte) (position + i), bytes[position + i]);
+ }
+ position += 50;
+ }
+ assertEquals(0, in.available());
+ }
+ }
}