This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 462a203e67 GCP: Add range reads to GCSInputStream (#8301)
462a203e67 is described below

commit 462a203e67dd42d111a7fd2d3a0090b5aeb80833
Author: Bryan Keller <[email protected]>
AuthorDate: Tue Aug 15 03:12:54 2023 -0700

    GCP: Add range reads to GCSInputStream (#8301)
---
 .../java/org/apache/iceberg/gcp/GCPProperties.java |  1 +
 .../org/apache/iceberg/gcp/gcs/GCSInputFile.java   | 14 ++---
 .../org/apache/iceberg/gcp/gcs/GCSInputStream.java | 61 ++++++++++++++++++----
 .../apache/iceberg/gcp/gcs/GCSInputStreamTest.java | 53 +++++++++++++++++--
 .../apache/iceberg/gcp/gcs/GCSLocationTest.java    |  2 +-
 5 files changed, 109 insertions(+), 22 deletions(-)

diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/GCPProperties.java 
b/gcp/src/main/java/org/apache/iceberg/gcp/GCPProperties.java
index 521eb4c6c8..55a8fdcfee 100644
--- a/gcp/src/main/java/org/apache/iceberg/gcp/GCPProperties.java
+++ b/gcp/src/main/java/org/apache/iceberg/gcp/GCPProperties.java
@@ -67,6 +67,7 @@ public class GCPProperties implements Serializable {
 
   public GCPProperties() {}
 
+  @SuppressWarnings("JavaUtilDate") // GCP API uses java.util.Date
   public GCPProperties(Map<String, String> properties) {
     projectId = properties.get(GCS_PROJECT_ID);
     clientLibToken = properties.get(GCS_CLIENT_LIB_TOKEN);
diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputFile.java 
b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputFile.java
index a911296a59..370119c632 100644
--- a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputFile.java
+++ b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputFile.java
@@ -26,7 +26,7 @@ import org.apache.iceberg.io.SeekableInputStream;
 import org.apache.iceberg.metrics.MetricsContext;
 
 class GCSInputFile extends BaseGCSFile implements InputFile {
-  private Long length;
+  private Long blobSize;
 
   static GCSInputFile fromLocation(
       String location, Storage storage, GCPProperties gcpProperties, 
MetricsContext metrics) {
@@ -50,24 +50,24 @@ class GCSInputFile extends BaseGCSFile implements InputFile 
{
   GCSInputFile(
       Storage storage,
       BlobId blobId,
-      Long length,
+      Long blobSize,
       GCPProperties gcpProperties,
       MetricsContext metrics) {
     super(storage, blobId, gcpProperties, metrics);
-    this.length = length;
+    this.blobSize = blobSize;
   }
 
   @Override
   public long getLength() {
-    if (length == null) {
-      this.length = getBlob().getSize();
+    if (blobSize == null) {
+      this.blobSize = getBlob().getSize();
     }
 
-    return length;
+    return blobSize;
   }
 
   @Override
   public SeekableInputStream newStream() {
-    return new GCSInputStream(storage(), blobId(), gcpProperties(), metrics());
+    return new GCSInputStream(storage(), blobId(), blobSize, gcpProperties(), 
metrics());
   }
 }
diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputStream.java 
b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputStream.java
index fc09bafadb..fbb8f764de 100644
--- a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputStream.java
+++ b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputStream.java
@@ -23,6 +23,7 @@ import com.google.cloud.ReadChannel;
 import com.google.cloud.storage.BlobId;
 import com.google.cloud.storage.Storage;
 import com.google.cloud.storage.Storage.BlobSourceOption;
+import java.io.EOFException;
 import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.nio.ByteBuffer;
@@ -30,6 +31,7 @@ import java.util.Arrays;
 import java.util.List;
 import org.apache.iceberg.gcp.GCPProperties;
 import org.apache.iceberg.io.FileIOMetricsContext;
+import org.apache.iceberg.io.RangeReadable;
 import org.apache.iceberg.io.SeekableInputStream;
 import org.apache.iceberg.metrics.Counter;
 import org.apache.iceberg.metrics.MetricsContext;
@@ -43,12 +45,13 @@ import org.slf4j.LoggerFactory;
  * The GCSInputStream leverages native streaming channels from the GCS API for 
streaming uploads.
  * See <a href="https://cloud.google.com/storage/docs/streaming";>Streaming 
Transfers</a>
  */
-class GCSInputStream extends SeekableInputStream {
+class GCSInputStream extends SeekableInputStream implements RangeReadable {
   private static final Logger LOG = 
LoggerFactory.getLogger(GCSInputStream.class);
 
   private final StackTraceElement[] createStack;
   private final Storage storage;
   private final BlobId blobId;
+  private Long blobSize;
   private final GCPProperties gcpProperties;
 
   private ReadChannel channel;
@@ -61,9 +64,14 @@ class GCSInputStream extends SeekableInputStream {
   private final Counter readOperations;
 
   GCSInputStream(
-      Storage storage, BlobId blobId, GCPProperties gcpProperties, 
MetricsContext metrics) {
+      Storage storage,
+      BlobId blobId,
+      Long blobSize,
+      GCPProperties gcpProperties,
+      MetricsContext metrics) {
     this.storage = storage;
     this.blobId = blobId;
+    this.blobSize = blobSize;
     this.gcpProperties = gcpProperties;
 
     this.readBytes = metrics.counter(FileIOMetricsContext.READ_BYTES, 
Unit.BYTES);
@@ -75,6 +83,10 @@ class GCSInputStream extends SeekableInputStream {
   }
 
   private void openStream() {
+    channel = openChannel();
+  }
+
+  private ReadChannel openChannel() {
     List<BlobSourceOption> sourceOptions = Lists.newArrayList();
 
     gcpProperties
@@ -84,9 +96,11 @@ class GCSInputStream extends SeekableInputStream {
         .userProject()
         .ifPresent(userProject -> 
sourceOptions.add(BlobSourceOption.userProject(userProject)));
 
-    channel = storage.reader(blobId, sourceOptions.toArray(new 
BlobSourceOption[0]));
+    ReadChannel result = storage.reader(blobId, sourceOptions.toArray(new 
BlobSourceOption[0]));
+
+    gcpProperties.channelReadChunkSize().ifPresent(result::setChunkSize);
 
-    gcpProperties.channelReadChunkSize().ifPresent(channel::setChunkSize);
+    return result;
   }
 
   @Override
@@ -123,19 +137,46 @@ class GCSInputStream extends SeekableInputStream {
   @Override
   public int read(byte[] b, int off, int len) throws IOException {
     Preconditions.checkState(!closed, "Cannot read: already closed");
-
     byteBuffer = byteBuffer != null && byteBuffer.array() == b ? byteBuffer : 
ByteBuffer.wrap(b);
-    byteBuffer.position(off);
-    byteBuffer.limit(Math.min(off + len, byteBuffer.capacity()));
-
-    int bytesRead = channel.read(byteBuffer);
+    int bytesRead = read(channel, byteBuffer, off, len);
     pos += bytesRead;
     readBytes.increment(bytesRead);
     readOperations.increment();
-
     return bytesRead;
   }
 
+  @Override
+  public void readFully(long position, byte[] buffer, int offset, int length) 
throws IOException {
+    try (ReadChannel readChannel = openChannel()) {
+      readChannel.seek(position);
+      readChannel.limit(position + length);
+      int bytesRead = read(readChannel, ByteBuffer.wrap(buffer), offset, 
length);
+      if (bytesRead < length) {
+        throw new EOFException(
+            "Reached the end of stream with " + (length - bytesRead) + " bytes 
left to read");
+      }
+    }
+  }
+
+  @Override
+  public int readTail(byte[] buffer, int offset, int length) throws 
IOException {
+    if (blobSize == null) {
+      blobSize = storage.get(blobId).getSize();
+    }
+    long startPosition = Math.max(0, blobSize - length);
+    try (ReadChannel readChannel = openChannel()) {
+      readChannel.seek(startPosition);
+      return read(readChannel, ByteBuffer.wrap(buffer), offset, length);
+    }
+  }
+
+  private int read(ReadChannel readChannel, ByteBuffer buffer, int off, int 
len)
+      throws IOException {
+    buffer.position(off);
+    buffer.limit(Math.min(off + len, buffer.capacity()));
+    return readChannel.read(buffer);
+  }
+
   @Override
   public void close() throws IOException {
     super.close();
diff --git 
a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSInputStreamTest.java 
b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSInputStreamTest.java
index b04891842c..76a0fa6f52 100644
--- a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSInputStreamTest.java
+++ b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSInputStreamTest.java
@@ -31,8 +31,10 @@ import java.util.Arrays;
 import java.util.Random;
 import org.apache.iceberg.gcp.GCPProperties;
 import org.apache.iceberg.io.IOUtil;
+import org.apache.iceberg.io.RangeReadable;
 import org.apache.iceberg.io.SeekableInputStream;
 import org.apache.iceberg.metrics.MetricsContext;
+import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 public class GCSInputStreamTest {
@@ -51,7 +53,7 @@ public class GCSInputStreamTest {
     writeGCSData(uri, data);
 
     try (SeekableInputStream in =
-        new GCSInputStream(storage, uri, gcpProperties, 
MetricsContext.nullMetrics())) {
+        new GCSInputStream(storage, uri, null, gcpProperties, 
MetricsContext.nullMetrics())) {
       int readSize = 1024;
       byte[] actual = new byte[readSize];
 
@@ -88,7 +90,7 @@ public class GCSInputStreamTest {
     writeGCSData(uri, data);
 
     try (SeekableInputStream in =
-        new GCSInputStream(storage, uri, gcpProperties, 
MetricsContext.nullMetrics())) {
+        new GCSInputStream(storage, uri, null, gcpProperties, 
MetricsContext.nullMetrics())) {
       assertThat(in.read()).isEqualTo(i0);
       assertThat(in.read()).isEqualTo(i1);
     }
@@ -116,11 +118,54 @@ public class GCSInputStreamTest {
     assertThat(actual).isEqualTo(Arrays.copyOfRange(original, (int) 
rangeStart, (int) rangeEnd));
   }
 
+  @Test
+  public void testRangeRead() throws Exception {
+    BlobId uri = BlobId.fromGsUtilUri("gs://bucket/path/to/read.dat");
+    int dataSize = 1024 * 1024 * 10;
+    byte[] expected = randomData(dataSize);
+    byte[] actual = new byte[dataSize];
+
+    long position;
+    int offset;
+    int length;
+
+    writeGCSData(uri, expected);
+
+    try (RangeReadable in =
+        new GCSInputStream(storage, uri, null, gcpProperties, 
MetricsContext.nullMetrics())) {
+      // first 1k
+      position = 0;
+      offset = 0;
+      length = 1024;
+      readAndCheckRanges(in, expected, position, actual, offset, length);
+
+      // last 1k
+      position = dataSize - 1024;
+      offset = dataSize - 1024;
+      readAndCheckRanges(in, expected, position, actual, offset, length);
+
+      // middle 2k
+      position = dataSize / 2 - 1024;
+      offset = dataSize / 2 - 1024;
+      length = 1024 * 2;
+      readAndCheckRanges(in, expected, position, actual, offset, length);
+    }
+  }
+
+  private void readAndCheckRanges(
+      RangeReadable in, byte[] original, long position, byte[] buffer, int 
offset, int length)
+      throws IOException {
+    in.readFully(position, buffer, offset, length);
+
+    Assertions.assertThat(Arrays.copyOfRange(buffer, offset, offset + length))
+        .isEqualTo(Arrays.copyOfRange(original, offset, offset + length));
+  }
+
   @Test
   public void testClose() throws Exception {
     BlobId blobId = BlobId.fromGsUtilUri("gs://bucket/path/to/closed.dat");
     SeekableInputStream closed =
-        new GCSInputStream(storage, blobId, gcpProperties, 
MetricsContext.nullMetrics());
+        new GCSInputStream(storage, blobId, null, gcpProperties, 
MetricsContext.nullMetrics());
     closed.close();
     assertThatThrownBy(() -> 
closed.seek(0)).isInstanceOf(IllegalStateException.class);
   }
@@ -133,7 +178,7 @@ public class GCSInputStreamTest {
     writeGCSData(blobId, data);
 
     try (SeekableInputStream in =
-        new GCSInputStream(storage, blobId, gcpProperties, 
MetricsContext.nullMetrics())) {
+        new GCSInputStream(storage, blobId, null, gcpProperties, 
MetricsContext.nullMetrics())) {
       in.seek(data.length / 2);
       byte[] actual = new byte[data.length / 2];
 
diff --git a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSLocationTest.java 
b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSLocationTest.java
index b92cfc35ae..551eb1374d 100644
--- a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSLocationTest.java
+++ b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSLocationTest.java
@@ -49,7 +49,7 @@ public class GCSLocationTest {
   }
 
   @Test
-  public void tesInvalidScheme() {
+  public void testInvalidScheme() {
     Assertions.assertThatThrownBy(() -> new 
GCSLocation("s3://bucket/path/to/prefix"))
         .isInstanceOf(ValidationException.class)
         .hasMessage("Invalid GCS URI, invalid scheme: s3");

Reply via email to