This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 462a203e67 GCP: Add range reads to GCSInputStream (#8301)
462a203e67 is described below
commit 462a203e67dd42d111a7fd2d3a0090b5aeb80833
Author: Bryan Keller <[email protected]>
AuthorDate: Tue Aug 15 03:12:54 2023 -0700
GCP: Add range reads to GCSInputStream (#8301)
---
.../java/org/apache/iceberg/gcp/GCPProperties.java | 1 +
.../org/apache/iceberg/gcp/gcs/GCSInputFile.java | 14 ++---
.../org/apache/iceberg/gcp/gcs/GCSInputStream.java | 61 ++++++++++++++++++----
.../apache/iceberg/gcp/gcs/GCSInputStreamTest.java | 53 +++++++++++++++++--
.../apache/iceberg/gcp/gcs/GCSLocationTest.java | 2 +-
5 files changed, 109 insertions(+), 22 deletions(-)
diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/GCPProperties.java
b/gcp/src/main/java/org/apache/iceberg/gcp/GCPProperties.java
index 521eb4c6c8..55a8fdcfee 100644
--- a/gcp/src/main/java/org/apache/iceberg/gcp/GCPProperties.java
+++ b/gcp/src/main/java/org/apache/iceberg/gcp/GCPProperties.java
@@ -67,6 +67,7 @@ public class GCPProperties implements Serializable {
public GCPProperties() {}
+ @SuppressWarnings("JavaUtilDate") // GCP API uses java.util.Date
public GCPProperties(Map<String, String> properties) {
projectId = properties.get(GCS_PROJECT_ID);
clientLibToken = properties.get(GCS_CLIENT_LIB_TOKEN);
diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputFile.java
b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputFile.java
index a911296a59..370119c632 100644
--- a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputFile.java
+++ b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputFile.java
@@ -26,7 +26,7 @@ import org.apache.iceberg.io.SeekableInputStream;
import org.apache.iceberg.metrics.MetricsContext;
class GCSInputFile extends BaseGCSFile implements InputFile {
- private Long length;
+ private Long blobSize;
static GCSInputFile fromLocation(
String location, Storage storage, GCPProperties gcpProperties,
MetricsContext metrics) {
@@ -50,24 +50,24 @@ class GCSInputFile extends BaseGCSFile implements InputFile
{
GCSInputFile(
Storage storage,
BlobId blobId,
- Long length,
+ Long blobSize,
GCPProperties gcpProperties,
MetricsContext metrics) {
super(storage, blobId, gcpProperties, metrics);
- this.length = length;
+ this.blobSize = blobSize;
}
@Override
public long getLength() {
- if (length == null) {
- this.length = getBlob().getSize();
+ if (blobSize == null) {
+ this.blobSize = getBlob().getSize();
}
- return length;
+ return blobSize;
}
@Override
public SeekableInputStream newStream() {
- return new GCSInputStream(storage(), blobId(), gcpProperties(), metrics());
+ return new GCSInputStream(storage(), blobId(), blobSize, gcpProperties(),
metrics());
}
}
diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputStream.java
b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputStream.java
index fc09bafadb..fbb8f764de 100644
--- a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputStream.java
+++ b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputStream.java
@@ -23,6 +23,7 @@ import com.google.cloud.ReadChannel;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.Storage.BlobSourceOption;
+import java.io.EOFException;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
@@ -30,6 +31,7 @@ import java.util.Arrays;
import java.util.List;
import org.apache.iceberg.gcp.GCPProperties;
import org.apache.iceberg.io.FileIOMetricsContext;
+import org.apache.iceberg.io.RangeReadable;
import org.apache.iceberg.io.SeekableInputStream;
import org.apache.iceberg.metrics.Counter;
import org.apache.iceberg.metrics.MetricsContext;
@@ -43,12 +45,13 @@ import org.slf4j.LoggerFactory;
* The GCSInputStream leverages native streaming channels from the GCS API for
streaming uploads.
* See <a href="https://cloud.google.com/storage/docs/streaming">Streaming
Transfers</a>
*/
-class GCSInputStream extends SeekableInputStream {
+class GCSInputStream extends SeekableInputStream implements RangeReadable {
private static final Logger LOG =
LoggerFactory.getLogger(GCSInputStream.class);
private final StackTraceElement[] createStack;
private final Storage storage;
private final BlobId blobId;
+ private Long blobSize;
private final GCPProperties gcpProperties;
private ReadChannel channel;
@@ -61,9 +64,14 @@ class GCSInputStream extends SeekableInputStream {
private final Counter readOperations;
GCSInputStream(
- Storage storage, BlobId blobId, GCPProperties gcpProperties,
MetricsContext metrics) {
+ Storage storage,
+ BlobId blobId,
+ Long blobSize,
+ GCPProperties gcpProperties,
+ MetricsContext metrics) {
this.storage = storage;
this.blobId = blobId;
+ this.blobSize = blobSize;
this.gcpProperties = gcpProperties;
this.readBytes = metrics.counter(FileIOMetricsContext.READ_BYTES,
Unit.BYTES);
@@ -75,6 +83,10 @@ class GCSInputStream extends SeekableInputStream {
}
private void openStream() {
+ channel = openChannel();
+ }
+
+ private ReadChannel openChannel() {
List<BlobSourceOption> sourceOptions = Lists.newArrayList();
gcpProperties
@@ -84,9 +96,11 @@ class GCSInputStream extends SeekableInputStream {
.userProject()
.ifPresent(userProject ->
sourceOptions.add(BlobSourceOption.userProject(userProject)));
- channel = storage.reader(blobId, sourceOptions.toArray(new
BlobSourceOption[0]));
+ ReadChannel result = storage.reader(blobId, sourceOptions.toArray(new
BlobSourceOption[0]));
+
+ gcpProperties.channelReadChunkSize().ifPresent(result::setChunkSize);
- gcpProperties.channelReadChunkSize().ifPresent(channel::setChunkSize);
+ return result;
}
@Override
@@ -123,19 +137,46 @@ class GCSInputStream extends SeekableInputStream {
@Override
public int read(byte[] b, int off, int len) throws IOException {
Preconditions.checkState(!closed, "Cannot read: already closed");
-
byteBuffer = byteBuffer != null && byteBuffer.array() == b ? byteBuffer :
ByteBuffer.wrap(b);
- byteBuffer.position(off);
- byteBuffer.limit(Math.min(off + len, byteBuffer.capacity()));
-
- int bytesRead = channel.read(byteBuffer);
+ int bytesRead = read(channel, byteBuffer, off, len);
pos += bytesRead;
readBytes.increment(bytesRead);
readOperations.increment();
-
return bytesRead;
}
+ @Override
+ public void readFully(long position, byte[] buffer, int offset, int length)
throws IOException {
+ try (ReadChannel readChannel = openChannel()) {
+ readChannel.seek(position);
+ readChannel.limit(position + length);
+ int bytesRead = read(readChannel, ByteBuffer.wrap(buffer), offset,
length);
+ if (bytesRead < length) {
+ throw new EOFException(
+ "Reached the end of stream with " + (length - bytesRead) + " bytes
left to read");
+ }
+ }
+ }
+
+ @Override
+ public int readTail(byte[] buffer, int offset, int length) throws
IOException {
+ if (blobSize == null) {
+ blobSize = storage.get(blobId).getSize();
+ }
+ long startPosition = Math.max(0, blobSize - length);
+ try (ReadChannel readChannel = openChannel()) {
+ readChannel.seek(startPosition);
+ return read(readChannel, ByteBuffer.wrap(buffer), offset, length);
+ }
+ }
+
+ private int read(ReadChannel readChannel, ByteBuffer buffer, int off, int
len)
+ throws IOException {
+ buffer.position(off);
+ buffer.limit(Math.min(off + len, buffer.capacity()));
+ return readChannel.read(buffer);
+ }
+
@Override
public void close() throws IOException {
super.close();
diff --git
a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSInputStreamTest.java
b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSInputStreamTest.java
index b04891842c..76a0fa6f52 100644
--- a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSInputStreamTest.java
+++ b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSInputStreamTest.java
@@ -31,8 +31,10 @@ import java.util.Arrays;
import java.util.Random;
import org.apache.iceberg.gcp.GCPProperties;
import org.apache.iceberg.io.IOUtil;
+import org.apache.iceberg.io.RangeReadable;
import org.apache.iceberg.io.SeekableInputStream;
import org.apache.iceberg.metrics.MetricsContext;
+import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
public class GCSInputStreamTest {
@@ -51,7 +53,7 @@ public class GCSInputStreamTest {
writeGCSData(uri, data);
try (SeekableInputStream in =
- new GCSInputStream(storage, uri, gcpProperties,
MetricsContext.nullMetrics())) {
+ new GCSInputStream(storage, uri, null, gcpProperties,
MetricsContext.nullMetrics())) {
int readSize = 1024;
byte[] actual = new byte[readSize];
@@ -88,7 +90,7 @@ public class GCSInputStreamTest {
writeGCSData(uri, data);
try (SeekableInputStream in =
- new GCSInputStream(storage, uri, gcpProperties,
MetricsContext.nullMetrics())) {
+ new GCSInputStream(storage, uri, null, gcpProperties,
MetricsContext.nullMetrics())) {
assertThat(in.read()).isEqualTo(i0);
assertThat(in.read()).isEqualTo(i1);
}
@@ -116,11 +118,54 @@ public class GCSInputStreamTest {
assertThat(actual).isEqualTo(Arrays.copyOfRange(original, (int)
rangeStart, (int) rangeEnd));
}
+ @Test
+ public void testRangeRead() throws Exception {
+ BlobId uri = BlobId.fromGsUtilUri("gs://bucket/path/to/read.dat");
+ int dataSize = 1024 * 1024 * 10;
+ byte[] expected = randomData(dataSize);
+ byte[] actual = new byte[dataSize];
+
+ long position;
+ int offset;
+ int length;
+
+ writeGCSData(uri, expected);
+
+ try (RangeReadable in =
+ new GCSInputStream(storage, uri, null, gcpProperties,
MetricsContext.nullMetrics())) {
+ // first 1k
+ position = 0;
+ offset = 0;
+ length = 1024;
+ readAndCheckRanges(in, expected, position, actual, offset, length);
+
+ // last 1k
+ position = dataSize - 1024;
+ offset = dataSize - 1024;
+ readAndCheckRanges(in, expected, position, actual, offset, length);
+
+ // middle 2k
+ position = dataSize / 2 - 1024;
+ offset = dataSize / 2 - 1024;
+ length = 1024 * 2;
+ readAndCheckRanges(in, expected, position, actual, offset, length);
+ }
+ }
+
+ private void readAndCheckRanges(
+ RangeReadable in, byte[] original, long position, byte[] buffer, int
offset, int length)
+ throws IOException {
+ in.readFully(position, buffer, offset, length);
+
+ Assertions.assertThat(Arrays.copyOfRange(buffer, offset, offset + length))
+ .isEqualTo(Arrays.copyOfRange(original, offset, offset + length));
+ }
+
@Test
public void testClose() throws Exception {
BlobId blobId = BlobId.fromGsUtilUri("gs://bucket/path/to/closed.dat");
SeekableInputStream closed =
- new GCSInputStream(storage, blobId, gcpProperties,
MetricsContext.nullMetrics());
+ new GCSInputStream(storage, blobId, null, gcpProperties,
MetricsContext.nullMetrics());
closed.close();
assertThatThrownBy(() ->
closed.seek(0)).isInstanceOf(IllegalStateException.class);
}
@@ -133,7 +178,7 @@ public class GCSInputStreamTest {
writeGCSData(blobId, data);
try (SeekableInputStream in =
- new GCSInputStream(storage, blobId, gcpProperties,
MetricsContext.nullMetrics())) {
+ new GCSInputStream(storage, blobId, null, gcpProperties,
MetricsContext.nullMetrics())) {
in.seek(data.length / 2);
byte[] actual = new byte[data.length / 2];
diff --git a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSLocationTest.java
b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSLocationTest.java
index b92cfc35ae..551eb1374d 100644
--- a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSLocationTest.java
+++ b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSLocationTest.java
@@ -49,7 +49,7 @@ public class GCSLocationTest {
}
@Test
- public void tesInvalidScheme() {
+ public void testInvalidScheme() {
Assertions.assertThatThrownBy(() -> new
GCSLocation("s3://bucket/path/to/prefix"))
.isInstanceOf(ValidationException.class)
.hasMessage("Invalid GCS URI, invalid scheme: s3");