This is an automated email from the ASF dual-hosted git repository.

heiming pushed a commit to branch tiered_storage
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/tiered_storage by this push:
     new 2a85d48be9a add test for cache channel and input stream
2a85d48be9a is described below

commit 2a85d48be9afc80e02d317eac3e872c2ad86f0ec
Author: HeimingZ <[email protected]>
AuthorDate: Fri May 19 15:40:53 2023 +0800

    add test for cache channel and input stream
---
 .../org/apache/iotdb/os/cache/OSFileChannel.java   |   9 +-
 .../org/apache/iotdb/os/cache/OSInputStream.java   |  17 ++-
 .../org/apache/iotdb/os/cache/OSFileCacheTest.java |   4 +-
 .../apache/iotdb/os/cache/OSFileChannelTest.java   | 159 ++++++++++++++++++++-
 .../apache/iotdb/os/cache/OSInputStreamTest.java   | 137 +++++++++++++++++-
 5 files changed, 303 insertions(+), 23 deletions(-)

diff --git 
a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileChannel.java 
b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileChannel.java
index 342a91836ca..db0649c3374 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileChannel.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileChannel.java
@@ -37,13 +37,18 @@ public class OSFileChannel implements Closeable {
   private static final Logger logger = 
LoggerFactory.getLogger(OSTsFileInput.class);
   private static final ObjectStorageConfig config =
       ObjectStorageDescriptor.getInstance().getConfig();
-  private static final OSFileCache cache = OSFileCache.getInstance();
+  private final OSFileCache cache;
   private final OSFile osFile;
   private long position = 0;
 
   private OSFileBlock currentOSFileBlock;
 
   public OSFileChannel(OSFile osFile) throws IOException {
+    this(osFile, OSFileCache.getInstance());
+  }
+
+  OSFileChannel(OSFile osFile, OSFileCache cache) throws IOException {
+    this.cache = cache;
     this.osFile = osFile;
   }
 
@@ -133,7 +138,7 @@ public class OSFileChannel implements Closeable {
     closeCurrentOSFileBlock();
   }
 
-  private static class OSFileBlock {
+  private class OSFileBlock {
     private OSFileCacheValue cacheValue;
     private FileChannel fileChannel;
 
diff --git 
a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSInputStream.java 
b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSInputStream.java
index a972c32a7cb..68166d0ce45 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSInputStream.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSInputStream.java
@@ -31,7 +31,7 @@ public class OSInputStream extends InputStream {
   }
 
   @Override
-  public int read() throws IOException {
+  public synchronized int read() throws IOException {
     byte[] b1 = new byte[1];
     int n = read(b1);
     if (n == 1) return b1[0] & 0xff;
@@ -39,12 +39,15 @@ public class OSInputStream extends InputStream {
   }
 
   @Override
-  public int read(byte[] b) throws IOException {
+  public synchronized int read(byte[] b) throws IOException {
     return read(b, 0, b.length);
   }
 
   @Override
-  public int read(byte[] b, int off, int len) throws IOException {
+  public synchronized int read(byte[] b, int off, int len) throws IOException {
+    if (off < 0 || len < 0 || len > b.length - off) {
+      throw new IndexOutOfBoundsException();
+    }
     if (len == 0) {
       return 0;
     }
@@ -55,7 +58,7 @@ public class OSInputStream extends InputStream {
   }
 
   @Override
-  public long skip(long n) throws IOException {
+  public synchronized long skip(long n) throws IOException {
     if (n <= 0) {
       return 0;
     }
@@ -67,12 +70,12 @@ public class OSInputStream extends InputStream {
   }
 
   @Override
-  public int available() throws IOException {
+  public synchronized int available() throws IOException {
     return (int) (channel.size() - channel.position());
   }
 
   @Override
-  public void close() throws IOException {
+  public synchronized void close() throws IOException {
     channel.close();
   }
 
@@ -87,7 +90,7 @@ public class OSInputStream extends InputStream {
   }
 
   @Override
-  public boolean markSupported() {
+  public synchronized boolean markSupported() {
     throw new UnsupportedOperationException();
   }
 }
diff --git 
a/object-storage/src/test/java/org/apache/iotdb/os/cache/OSFileCacheTest.java 
b/object-storage/src/test/java/org/apache/iotdb/os/cache/OSFileCacheTest.java
index 51b8e351755..0691687e3d7 100644
--- 
a/object-storage/src/test/java/org/apache/iotdb/os/cache/OSFileCacheTest.java
+++ 
b/object-storage/src/test/java/org/apache/iotdb/os/cache/OSFileCacheTest.java
@@ -77,7 +77,7 @@ public class OSFileCacheTest {
   }
 
   @Test
-  public void get() throws Exception {
+  public void testGet() throws Exception {
     // pull 0-100, and 0-100 exists
     byte[] bytes0to100 = new byte[100];
     for (int i = 0; i < 100; ++i) {
@@ -102,7 +102,7 @@ public class OSFileCacheTest {
     // pull 100-200, but only 100-150 exists
     byte[] bytes100to150 = new byte[150 - 100];
     for (int i = 0; i < 150 - 100; ++i) {
-      bytes0to100[i] = (byte) i;
+      bytes100to150[i] = (byte) i;
     }
     when(connector.getRemoteFile(testFile.toOSURI(), 100, 
100)).thenReturn(bytes100to150);
     OSFileCacheKey key100to150 = new OSFileCacheKey(testFile, 100);
diff --git 
a/object-storage/src/test/java/org/apache/iotdb/os/cache/OSFileChannelTest.java 
b/object-storage/src/test/java/org/apache/iotdb/os/cache/OSFileChannelTest.java
index fca7fc5caa6..bb09faa2de9 100644
--- 
a/object-storage/src/test/java/org/apache/iotdb/os/cache/OSFileChannelTest.java
+++ 
b/object-storage/src/test/java/org/apache/iotdb/os/cache/OSFileChannelTest.java
@@ -18,24 +18,171 @@
  */
 package org.apache.iotdb.os.cache;
 
+import org.apache.iotdb.os.conf.ObjectStorageConfig;
+import org.apache.iotdb.os.conf.ObjectStorageDescriptor;
+import org.apache.iotdb.os.exception.ObjectStorageException;
+import org.apache.iotdb.os.fileSystem.OSFile;
+import org.apache.iotdb.os.fileSystem.OSURI;
+import org.apache.iotdb.os.io.ObjectStorageConnector;
+import org.apache.iotdb.os.io.aws.S3MetaData;
+import org.apache.iotdb.os.utils.ObjectStorageType;
+
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.testcontainers.shaded.org.apache.commons.io.FileUtils;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 import static org.junit.Assert.*;
+import static org.mockito.Mockito.when;
 
+@RunWith(MockitoJUnitRunner.class)
 public class OSFileChannelTest {
+  private static final File cacheDir = new File("target" + File.separator + 
"cache");
+  private static final ObjectStorageConfig config =
+      ObjectStorageDescriptor.getInstance().getConfig();
+  private static final int cachePageSize = 100;
+  @InjectMocks private OSFileCache cache = new 
OSFileCache(ObjectStorageType.TEST);
 
-  @Test
-  public void size() {}
+  @InjectMocks
+  private OSFile testFile =
+      new OSFile(new OSURI("test_bucket", "test_key"), ObjectStorageType.TEST);
+
+  @Mock private ObjectStorageConnector connector;
+  private int prevCachePageSize;
+  private String[] prevCacheDirs;
+
+  @Before
+  public void setUp() throws Exception {
+    prevCachePageSize = config.getCachePageSize();
+    config.setCachePageSize(cachePageSize);
+    prevCacheDirs = config.getCacheDirs();
+    config.setCacheDirs(new String[] {cacheDir.getPath()});
+    cacheDir.mkdirs();
+    CacheFileManager.getInstance().setCacheFileId(0);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    config.setCachePageSize(prevCachePageSize);
+    config.setCacheDirs(prevCacheDirs);
+    FileUtils.deleteDirectory(cacheDir);
+  }
+
+  private void prepareOSFile(int size) throws ObjectStorageException {
+    when(connector.getMetaData(testFile.toOSURI()))
+        .thenReturn(new S3MetaData(size, System.currentTimeMillis()));
+    int startPos = 0;
+    while (startPos < size) {
+      byte[] bytes = new byte[(int) Math.min(cachePageSize, size - startPos)];
+      for (int i = 0; i < bytes.length; ++i) {
+        bytes[i] = (byte) (startPos + i);
+      }
+      when(connector.getRemoteFile(testFile.toOSURI(), startPos, 
cachePageSize)).thenReturn(bytes);
+      startPos += cachePageSize;
+    }
+  }
 
   @Test
-  public void position() {}
+  public void testReadFileSmallerThanPageSize() throws Exception {
+    int size = 50;
+    prepareOSFile(size);
+    try (OSFileChannel channel = new OSFileChannel(testFile, cache)) {
+      ByteBuffer dst = ByteBuffer.allocate(cachePageSize);
+      assertEquals(size, channel.read(dst));
+      dst.flip();
+      for (int i = 0; i < size; ++i) {
+        assertEquals((byte) i, dst.get());
+      }
+    }
+  }
 
   @Test
-  public void testPosition() {}
+  public void testReadFileBiggerThanPageSize() throws Exception {
+    int size = 550;
+    prepareOSFile(size);
+    try (OSFileChannel channel = new OSFileChannel(testFile, cache)) {
+      ByteBuffer dst = ByteBuffer.allocate(size);
+      assertEquals(size, channel.read(dst));
+      dst.flip();
+      for (int i = 0; i < size; ++i) {
+        assertEquals((byte) i, dst.get());
+      }
+    }
+  }
 
   @Test
-  public void read() {}
+  public void testReadByPosition() throws Exception {
+    int size = 550;
+    prepareOSFile(size);
+    try (OSFileChannel channel = new OSFileChannel(testFile, cache)) {
+      int position = 0;
+      while (position < size) {
+        ByteBuffer dst = ByteBuffer.allocate(50);
+        assertEquals(50, channel.read(dst, position));
+        dst.flip();
+        for (int i = 0; i < 50; ++i) {
+          assertEquals((byte) (position + i), dst.get());
+        }
+        position += 50;
+      }
+    }
+  }
 
   @Test
-  public void testRead() {}
+  public void testConcurrentRead() throws Exception {
+    int size = 550;
+    prepareOSFile(size);
+    try (OSFileChannel channel = new OSFileChannel(testFile, cache)) {
+      // random read by
+      int threadsNum = 3;
+      ExecutorService executorService = 
Executors.newFixedThreadPool(threadsNum);
+      List<Future<Void>> futures = new ArrayList<>();
+      for (int i = 0; i < threadsNum * 3; ++i) {
+        Callable<Void> readTask =
+            () -> {
+              int position = 0;
+              while (position < size) {
+                ByteBuffer dst = ByteBuffer.allocate(50);
+                assertEquals(50, channel.read(dst, position));
+                dst.flip();
+                for (int j = 0; j < 50; ++j) {
+                  assertEquals((byte) (position + j), dst.get());
+                }
+                position += 50;
+              }
+              return null;
+            };
+        Future<Void> future = executorService.submit(readTask);
+        futures.add(future);
+      }
+      // sequence read
+      int position = 0;
+      while (position < size) {
+        ByteBuffer dst = ByteBuffer.allocate(50);
+        assertEquals(50, channel.read(dst));
+        dst.flip();
+        for (int i = 0; i < 50; ++i) {
+          assertEquals((byte) (position + i), dst.get());
+        }
+        position += 50;
+      }
+      // wait all random read tasks end
+      for (Future<Void> future : futures) {
+        future.get();
+      }
+    }
+  }
 }
diff --git 
a/object-storage/src/test/java/org/apache/iotdb/os/cache/OSInputStreamTest.java 
b/object-storage/src/test/java/org/apache/iotdb/os/cache/OSInputStreamTest.java
index 54852aa8df6..21741294ae5 100644
--- 
a/object-storage/src/test/java/org/apache/iotdb/os/cache/OSInputStreamTest.java
+++ 
b/object-storage/src/test/java/org/apache/iotdb/os/cache/OSInputStreamTest.java
@@ -18,24 +18,149 @@
  */
 package org.apache.iotdb.os.cache;
 
+import org.apache.iotdb.os.conf.ObjectStorageConfig;
+import org.apache.iotdb.os.conf.ObjectStorageDescriptor;
+import org.apache.iotdb.os.exception.ObjectStorageException;
+import org.apache.iotdb.os.fileSystem.OSFile;
+import org.apache.iotdb.os.fileSystem.OSURI;
+import org.apache.iotdb.os.io.ObjectStorageConnector;
+import org.apache.iotdb.os.io.aws.S3MetaData;
+import org.apache.iotdb.os.utils.ObjectStorageType;
+
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.testcontainers.shaded.org.apache.commons.io.FileUtils;
+
+import java.io.File;
+import java.io.InputStream;
 
 import static org.junit.Assert.*;
+import static org.mockito.Mockito.when;
 
+@RunWith(MockitoJUnitRunner.class)
 public class OSInputStreamTest {
+  private static final File cacheDir = new File("target" + File.separator + 
"cache");
+  private static final ObjectStorageConfig config =
+      ObjectStorageDescriptor.getInstance().getConfig();
+  private static final int cachePageSize = 100;
+  @InjectMocks private OSFileCache cache = new 
OSFileCache(ObjectStorageType.TEST);
 
-  @Test
-  public void read() {}
+  @InjectMocks
+  private OSFile testFile =
+      new OSFile(new OSURI("test_bucket", "test_key"), ObjectStorageType.TEST);
+
+  @Mock private ObjectStorageConnector connector;
+  private int prevCachePageSize;
+  private String[] prevCacheDirs;
+
+  @Before
+  public void setUp() throws Exception {
+    prevCachePageSize = config.getCachePageSize();
+    config.setCachePageSize(cachePageSize);
+    prevCacheDirs = config.getCacheDirs();
+    config.setCacheDirs(new String[] {cacheDir.getPath()});
+    cacheDir.mkdirs();
+    CacheFileManager.getInstance().setCacheFileId(0);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    config.setCachePageSize(prevCachePageSize);
+    config.setCacheDirs(prevCacheDirs);
+    FileUtils.deleteDirectory(cacheDir);
+  }
+
+  private void prepareOSFile(int size) throws ObjectStorageException {
+    when(connector.getMetaData(testFile.toOSURI()))
+        .thenReturn(new S3MetaData(size, System.currentTimeMillis()));
+    int startPos = 0;
+    while (startPos < size) {
+      byte[] bytes = new byte[(int) Math.min(cachePageSize, size - startPos)];
+      for (int i = 0; i < bytes.length; ++i) {
+        bytes[i] = (byte) (startPos + i);
+      }
+      when(connector.getRemoteFile(testFile.toOSURI(), startPos, 
cachePageSize)).thenReturn(bytes);
+      startPos += cachePageSize;
+    }
+  }
 
   @Test
-  public void testRead() {}
+  public void testReadByte() throws Exception {
+    int size = 50;
+    prepareOSFile(size);
+    try (OSFileChannel channel = new OSFileChannel(testFile, cache);
+        InputStream in = OSFileChannel.newInputStream(channel)) {
+      for (int i = 0; in.available() > 0; ++i) {
+        assertEquals((byte) i, in.read());
+      }
+      assertEquals(0, in.available());
+    }
+  }
 
   @Test
-  public void testRead1() {}
+  public void testReadByteArray() throws Exception {
+    int size = 128;
+    prepareOSFile(size);
+    try (OSFileChannel channel = new OSFileChannel(testFile, cache);
+        InputStream in = OSFileChannel.newInputStream(channel)) {
+      int position = 0;
+      while (position < size) {
+        byte[] bytes = new byte[50];
+        assertEquals(Math.min(50, size - position), in.read(bytes));
+        for (int i = 0; i < Math.min(50, size - position); ++i) {
+          assertEquals((byte) (position + i), bytes[i]);
+        }
+        position += 50;
+      }
+      assertEquals(0, in.available());
+    }
+  }
 
   @Test
-  public void skip() {}
+  public void testReadByteArrayByLen() throws Exception {
+    int size = 550;
+    prepareOSFile(size);
+    try (OSFileChannel channel = new OSFileChannel(testFile, cache);
+        InputStream in = OSFileChannel.newInputStream(channel)) {
+      byte[] bytes = new byte[size];
+      int position = 0;
+      while (position < size) {
+        assertEquals(50, in.read(bytes, position, 50));
+        for (int i = 0; i < 50; ++i) {
+          assertEquals((byte) (position + i), bytes[position + i]);
+        }
+        position += 50;
+      }
+      assertEquals(0, in.available());
+    }
+  }
 
   @Test
-  public void available() {}
+  public void testSkip() throws Exception {
+    int size = 550;
+    prepareOSFile(size);
+    try (OSFileChannel channel = new OSFileChannel(testFile, cache);
+        InputStream in = OSFileChannel.newInputStream(channel)) {
+      byte[] bytes = new byte[size];
+      int position = 0;
+      while (position < size) {
+        if (position % 100 == 0) {
+          in.skip(50);
+          position += 50;
+          continue;
+        }
+        assertEquals(50, in.read(bytes, position, 50));
+        for (int i = 0; i < 50; ++i) {
+          assertEquals((byte) (position + i), bytes[position + i]);
+        }
+        position += 50;
+      }
+      assertEquals(0, in.available());
+    }
+  }
 }

Reply via email to