BenWhitehead commented on code in PR #37900:
URL: https://github.com/apache/beam/pull/37900#discussion_r2991034513
##########
sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java:
##########
@@ -587,4 +604,82 @@ private void assertNotExists(GcsPath path) throws
IOException {
assertThrows(FileNotFoundException.class, () -> gcsUtil.getObject(path));
}
}
+
+ @Test
+ public void testRead() throws IOException, NoSuchAlgorithmException {
+ final GcsPath gcsPath =
GcsPath.fromUri("gs://apache-beam-samples/shakespeare/kinglear.txt");
+ final String expectedHash =
"674a2725884307c96398440497c889ad8cecccedf5689df85e6b0faabe4e0fe8";
+ final long expectedSize = 157283L;
+
+ try (SeekableByteChannel channel = gcsUtil.open(gcsPath)) {
+ // Verify Size
+ assertEquals(expectedSize, channel.size());
+ assertEquals(0, channel.position());
+
+ // Read content into ByteBuffer
+ ByteBuffer buffer = ByteBuffer.allocate((int) expectedSize);
+ int bytesRead = 0;
+ while (buffer.hasRemaining()) {
+ int read = channel.read(buffer);
+ if (read == -1) {
+ break;
+ }
+ bytesRead += read;
+ }
+
+ // Verify total bytes read and position
+ assertEquals(expectedSize, bytesRead);
+ assertEquals(expectedSize, channel.position());
+
+ // Flip the buffer to prepare it for reading (sets limit to current
position, position to 0)
+ buffer.flip();
+
+ // Verify hash
+ MessageDigest digest = MessageDigest.getInstance("SHA-256");
+ digest.update(buffer);
+ byte[] hashBytes = digest.digest();
+
+ // Convert bytes to Hex String
+ StringBuilder sb = new StringBuilder();
+ for (byte b : hashBytes) {
+ sb.append(String.format("%02x", b));
+ }
+ String actualHash = sb.toString();
+
+ assertEquals("Content hash should match", expectedHash, actualHash);
+ }
+ }
+
+ @Test
+ public void testWriteAndRead() throws IOException {
+ final String bucketName = "apache-beam-temp-bucket-12345";
+ final GcsPath targetPath = GcsPath.fromComponents(bucketName,
"test-object.txt");
+ final String content = "Hello, GCS!";
Review Comment:
The default chunk size for an upload using the writer is 16MiB. This means
that if creating an object with less than the chunk size there will be 1 rpc to
create the upload, and 1 rpc to upload and finalize the bytes. If verifying
chunk size is important, you'll want this content to be larger.
##########
sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java:
##########
@@ -587,4 +604,82 @@ private void assertNotExists(GcsPath path) throws
IOException {
assertThrows(FileNotFoundException.class, () -> gcsUtil.getObject(path));
}
}
+
+ @Test
+ public void testRead() throws IOException, NoSuchAlgorithmException {
+ final GcsPath gcsPath =
GcsPath.fromUri("gs://apache-beam-samples/shakespeare/kinglear.txt");
+ final String expectedHash =
"674a2725884307c96398440497c889ad8cecccedf5689df85e6b0faabe4e0fe8";
+ final long expectedSize = 157283L;
+
+ try (SeekableByteChannel channel = gcsUtil.open(gcsPath)) {
+ // Verify Size
+ assertEquals(expectedSize, channel.size());
+ assertEquals(0, channel.position());
+
+ // Read content into ByteBuffer
+ ByteBuffer buffer = ByteBuffer.allocate((int) expectedSize);
+ int bytesRead = 0;
+ while (buffer.hasRemaining()) {
+ int read = channel.read(buffer);
+ if (read == -1) {
+ break;
+ }
+ bytesRead += read;
+ }
+
+ // Verify total bytes read and position
+ assertEquals(expectedSize, bytesRead);
+ assertEquals(expectedSize, channel.position());
+
+ // Flip the buffer to prepare it for reading (sets limit to current
position, position to 0)
+ buffer.flip();
+
+ // Verify hash
+ MessageDigest digest = MessageDigest.getInstance("SHA-256");
+ digest.update(buffer);
+ byte[] hashBytes = digest.digest();
+
+ // Convert bytes to Hex String
+ StringBuilder sb = new StringBuilder();
+ for (byte b : hashBytes) {
+ sb.append(String.format("%02x", b));
+ }
+ String actualHash = sb.toString();
+
+ assertEquals("Content hash should match", expectedHash, actualHash);
+ }
+ }
+
+ @Test
+ public void testWriteAndRead() throws IOException {
+ final String bucketName = "apache-beam-temp-bucket-12345";
+ final GcsPath targetPath = GcsPath.fromComponents(bucketName,
"test-object.txt");
+ final String content = "Hello, GCS!";
+
+ try {
+ createTestBucketHelper(bucketName, false);
+
+ // Write content to a GCS file
+ CreateOptions options =
CreateOptions.builder().setExpectFileToNotExist(true).build();
+ try (WritableByteChannel writer = gcsUtil.create(targetPath, options)) {
+
writer.write(ByteBuffer.wrap(content.getBytes(StandardCharsets.UTF_8)));
+ }
+
+ // Read content into a buffer
+ StringBuilder readContent = new StringBuilder();
+ try (ReadableByteChannel reader = gcsUtil.open(targetPath)) {
+ ByteBuffer buffer = ByteBuffer.allocate(1024);
+ while (reader.read(buffer) != -1) {
+ buffer.flip();
+ readContent.append(StandardCharsets.UTF_8.decode(buffer));
+ buffer.clear();
+ }
+ }
+
+ // Verify content
+ assertEquals(content, readContent.toString());
Review Comment:
nit: I generally prefer asserting bytes rather than strings, that way
non-printed characters or similar looking characters aren't sources of
confusion in any assertion failure.
##########
sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java:
##########
@@ -587,4 +604,82 @@ private void assertNotExists(GcsPath path) throws
IOException {
assertThrows(FileNotFoundException.class, () -> gcsUtil.getObject(path));
}
}
+
+ @Test
+ public void testRead() throws IOException, NoSuchAlgorithmException {
+ final GcsPath gcsPath =
GcsPath.fromUri("gs://apache-beam-samples/shakespeare/kinglear.txt");
+ final String expectedHash =
"674a2725884307c96398440497c889ad8cecccedf5689df85e6b0faabe4e0fe8";
+ final long expectedSize = 157283L;
+
+ try (SeekableByteChannel channel = gcsUtil.open(gcsPath)) {
+ // Verify Size
+ assertEquals(expectedSize, channel.size());
+ assertEquals(0, channel.position());
+
+ // Read content into ByteBuffer
+ ByteBuffer buffer = ByteBuffer.allocate((int) expectedSize);
+ int bytesRead = 0;
+ while (buffer.hasRemaining()) {
+ int read = channel.read(buffer);
+ if (read == -1) {
+ break;
+ }
+ bytesRead += read;
+ }
+
+ // Verify total bytes read and position
+ assertEquals(expectedSize, bytesRead);
+ assertEquals(expectedSize, channel.position());
+
+ // Flip the buffer to prepare it for reading (sets limit to current
position, position to 0)
+ buffer.flip();
+
+ // Verify hash
+ MessageDigest digest = MessageDigest.getInstance("SHA-256");
+ digest.update(buffer);
+ byte[] hashBytes = digest.digest();
+
+ // Convert bytes to Hex String
+ StringBuilder sb = new StringBuilder();
+ for (byte b : hashBytes) {
+ sb.append(String.format("%02x", b));
+ }
+ String actualHash = sb.toString();
+
+ assertEquals("Content hash should match", expectedHash, actualHash);
+ }
+ }
+
+ @Test
+ public void testWriteAndRead() throws IOException {
+ final String bucketName = "apache-beam-temp-bucket-12345";
+ final GcsPath targetPath = GcsPath.fromComponents(bucketName,
"test-object.txt");
Review Comment:
I would make this object name unique so that another test, or a past test
doesn't interfere with the expectation that this test should create a new
object.
##########
sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java:
##########
@@ -587,4 +604,82 @@ private void assertNotExists(GcsPath path) throws
IOException {
assertThrows(FileNotFoundException.class, () -> gcsUtil.getObject(path));
}
}
+
+ @Test
+ public void testRead() throws IOException, NoSuchAlgorithmException {
+ final GcsPath gcsPath =
GcsPath.fromUri("gs://apache-beam-samples/shakespeare/kinglear.txt");
+ final String expectedHash =
"674a2725884307c96398440497c889ad8cecccedf5689df85e6b0faabe4e0fe8";
+ final long expectedSize = 157283L;
+
+ try (SeekableByteChannel channel = gcsUtil.open(gcsPath)) {
+ // Verify Size
+ assertEquals(expectedSize, channel.size());
+ assertEquals(0, channel.position());
+
+ // Read content into ByteBuffer
+ ByteBuffer buffer = ByteBuffer.allocate((int) expectedSize);
+ int bytesRead = 0;
+ while (buffer.hasRemaining()) {
+ int read = channel.read(buffer);
+ if (read == -1) {
+ break;
+ }
+ bytesRead += read;
+ }
Review Comment:
Use our util so that your test is more clearly focused on the logic of the
data than the movement of that data.
```suggestion
int bytesRead = StorageChannelUtils.blockingFillFrom(buffer, channel);
```
##########
sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java:
##########
@@ -481,4 +493,141 @@ public void removeBucket(BucketInfo bucketInfo) throws
IOException {
throw translateStorageException(bucketInfo.getName(), null, e);
}
}
+
+ /** A bridge that allows a GCS ReadChannel to behave as a
SeekableByteChannel. */
+ private static class GcsSeekableByteChannel implements SeekableByteChannel {
+ private final ReadChannel reader;
+ private final long size;
+ private long position = 0;
+
+ GcsSeekableByteChannel(ReadChannel reader, long size) {
+ this.reader = reader;
+ this.size = size;
+ this.position = 0;
+ }
+
+ @Override
+ public int read(ByteBuffer dst) throws IOException {
+ int count = reader.read(dst);
Review Comment:
After making the channel non-blocking, you'll probably want to add the
following change if you need to always fill the provided buffer as much as
possible:
```suggestion
int count = StorageChannelUtils.blockingFillFrom(dst, reader);
```
##########
sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java:
##########
@@ -587,4 +604,82 @@ private void assertNotExists(GcsPath path) throws
IOException {
assertThrows(FileNotFoundException.class, () -> gcsUtil.getObject(path));
}
}
+
+ @Test
+ public void testRead() throws IOException, NoSuchAlgorithmException {
+ final GcsPath gcsPath =
GcsPath.fromUri("gs://apache-beam-samples/shakespeare/kinglear.txt");
+ final String expectedHash =
"674a2725884307c96398440497c889ad8cecccedf5689df85e6b0faabe4e0fe8";
+ final long expectedSize = 157283L;
+
+ try (SeekableByteChannel channel = gcsUtil.open(gcsPath)) {
+ // Verify Size
+ assertEquals(expectedSize, channel.size());
+ assertEquals(0, channel.position());
+
+ // Read content into ByteBuffer
+ ByteBuffer buffer = ByteBuffer.allocate((int) expectedSize);
+ int bytesRead = 0;
+ while (buffer.hasRemaining()) {
+ int read = channel.read(buffer);
+ if (read == -1) {
+ break;
+ }
+ bytesRead += read;
+ }
+
+ // Verify total bytes read and position
+ assertEquals(expectedSize, bytesRead);
+ assertEquals(expectedSize, channel.position());
+
+ // Flip the buffer to prepare it for reading (sets limit to current
position, position to 0)
+ buffer.flip();
+
+ // Verify hash
+ MessageDigest digest = MessageDigest.getInstance("SHA-256");
+ digest.update(buffer);
+ byte[] hashBytes = digest.digest();
+
+ // Convert bytes to Hex String
+ StringBuilder sb = new StringBuilder();
+ for (byte b : hashBytes) {
+ sb.append(String.format("%02x", b));
+ }
+ String actualHash = sb.toString();
Review Comment:
nit: i'd make this a helper method to make it easier to read the test, since
the test isn't actually testing sha256 computation and encoding.
##########
sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java:
##########
@@ -481,4 +493,141 @@ public void removeBucket(BucketInfo bucketInfo) throws
IOException {
throw translateStorageException(bucketInfo.getName(), null, e);
}
}
+
+ /** A bridge that allows a GCS ReadChannel to behave as a
SeekableByteChannel. */
+ private static class GcsSeekableByteChannel implements SeekableByteChannel {
+ private final ReadChannel reader;
+ private final long size;
+ private long position = 0;
+
+ GcsSeekableByteChannel(ReadChannel reader, long size) {
+ this.reader = reader;
+ this.size = size;
+ this.position = 0;
+ }
+
+ @Override
+ public int read(ByteBuffer dst) throws IOException {
+ int count = reader.read(dst);
+ if (count > 0) {
+ this.position += count;
+ }
+ return count;
+ }
+
+ @Override
+ public SeekableByteChannel position(long newPosition) throws IOException {
+ checkArgument(newPosition >= 0, "Position must be non-negative: %s",
newPosition);
+ reader.seek(newPosition);
+ this.position = newPosition;
+ return this;
+ }
+
+ @Override
+ public long position() throws IOException {
+ return this.position;
+ }
+
+ @Override
+ public long size() throws IOException {
+ return size;
+ }
+
+ @Override
+ public SeekableByteChannel truncate(long size) throws IOException {
+ throw new UnsupportedOperationException(
+ "GcsSeekableByteChannels are read-only and cannot be truncated.");
+ }
+
+ @Override
+ public int write(ByteBuffer src) throws IOException {
+ throw new UnsupportedOperationException(
+ "GcsSeekableByteChannel are read-only and does not support
writing.");
+ }
+
+ @Override
+ public boolean isOpen() {
+ return reader.isOpen();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (isOpen()) {
+ reader.close();
+ }
+ }
+ }
+
+ public SeekableByteChannel open(GcsPath path, BlobSourceOption...
sourceOptions)
+ throws IOException {
+ Blob blob = getBlob(path, BlobGetOption.fields(BlobField.SIZE));
+ return new GcsSeekableByteChannel(
+ blob.getStorage().reader(blob.getBlobId(), sourceOptions),
blob.getSize());
+ }
+
+ /** A bridge that allows a GCS WriteChannel to behave as a
WritableByteChannel. */
+ private static class GcsWritableByteChannel implements WritableByteChannel {
+ private final WriteChannel writer;
+ private final GcsPath gcsPath;
+
+ GcsWritableByteChannel(WriteChannel writer, GcsPath gcsPath) {
+ this.writer = writer;
+ this.gcsPath = gcsPath;
+ }
+
+ @Override
+ public int write(ByteBuffer src) throws IOException {
+ try {
+ return writer.write(src);
+ } catch (StorageException e) {
+ throw translateStorageException(gcsPath, e);
+ }
+ }
+
+ @Override
+ public boolean isOpen() {
+ return writer.isOpen();
+ }
+
+ @Override
+ public void close() throws IOException {
+ writer.close();
+ }
+ }
+
+ public WritableByteChannel create(
+ GcsPath path, GcsUtilV1.CreateOptions options, BlobWriteOption...
writeOptions)
+ throws IOException {
+ try {
+ // Define the metadata for the new object
+ BlobInfo.Builder builder = BlobInfo.newBuilder(path.getBucket(),
path.getObject());
+ String type = options.getContentType();
+ if (type != null) {
+ builder.setContentType(type);
+ }
+
+ BlobInfo blobInfo = builder.build();
+
+ List<BlobWriteOption> writeOptionList = new
ArrayList<>(Arrays.asList(writeOptions));
+ if (options.getExpectFileToNotExist()) {
+ writeOptionList.add(BlobWriteOption.doesNotExist());
+ }
Review Comment:
This should be flushed out more. If there isn't a precondition[1] present
when the writer is created, some internal rpcs won't be able to be
automatically retried. And example of the other branch can be seen in this code
sample for create from array
https://github.com/googleapis/java-storage/blob/ba5daed0c1d306f821cc26549142ff0bcfb80cbb/samples/snippets/src/main/java/com/example/storage/object/UploadObjectFromMemory.java#L53-L64
* [1] https://docs.cloud.google.com/storage/docs/request-preconditions
##########
sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java:
##########
@@ -587,4 +604,82 @@ private void assertNotExists(GcsPath path) throws
IOException {
assertThrows(FileNotFoundException.class, () -> gcsUtil.getObject(path));
}
}
+
+ @Test
+ public void testRead() throws IOException, NoSuchAlgorithmException {
+ final GcsPath gcsPath =
GcsPath.fromUri("gs://apache-beam-samples/shakespeare/kinglear.txt");
+ final String expectedHash =
"674a2725884307c96398440497c889ad8cecccedf5689df85e6b0faabe4e0fe8";
+ final long expectedSize = 157283L;
+
+ try (SeekableByteChannel channel = gcsUtil.open(gcsPath)) {
+ // Verify Size
+ assertEquals(expectedSize, channel.size());
+ assertEquals(0, channel.position());
+
+ // Read content into ByteBuffer
+ ByteBuffer buffer = ByteBuffer.allocate((int) expectedSize);
Review Comment:
For tests, I generally recommend allocating larger buffers than the actual
size of the expected download. By doing so, you ensure that the EOF you receive
is where you expect and not that you simply have a common subrange.
##########
sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java:
##########
@@ -481,4 +493,141 @@ public void removeBucket(BucketInfo bucketInfo) throws
IOException {
throw translateStorageException(bucketInfo.getName(), null, e);
}
}
+
+ /** A bridge that allows a GCS ReadChannel to behave as a
SeekableByteChannel. */
+ private static class GcsSeekableByteChannel implements SeekableByteChannel {
+ private final ReadChannel reader;
+ private final long size;
+ private long position = 0;
+
+ GcsSeekableByteChannel(ReadChannel reader, long size) {
+ this.reader = reader;
+ this.size = size;
+ this.position = 0;
+ }
+
+ @Override
+ public int read(ByteBuffer dst) throws IOException {
+ int count = reader.read(dst);
+ if (count > 0) {
+ this.position += count;
+ }
+ return count;
+ }
+
+ @Override
+ public SeekableByteChannel position(long newPosition) throws IOException {
+ checkArgument(newPosition >= 0, "Position must be non-negative: %s",
newPosition);
+ reader.seek(newPosition);
+ this.position = newPosition;
+ return this;
+ }
+
+ @Override
+ public long position() throws IOException {
+ return this.position;
+ }
+
+ @Override
+ public long size() throws IOException {
+ return size;
+ }
+
+ @Override
+ public SeekableByteChannel truncate(long size) throws IOException {
+ throw new UnsupportedOperationException(
+ "GcsSeekableByteChannels are read-only and cannot be truncated.");
+ }
+
+ @Override
+ public int write(ByteBuffer src) throws IOException {
+ throw new UnsupportedOperationException(
+ "GcsSeekableByteChannel are read-only and does not support
writing.");
+ }
+
+ @Override
+ public boolean isOpen() {
+ return reader.isOpen();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (isOpen()) {
+ reader.close();
+ }
+ }
+ }
+
+ public SeekableByteChannel open(GcsPath path, BlobSourceOption...
sourceOptions)
+ throws IOException {
+ Blob blob = getBlob(path, BlobGetOption.fields(BlobField.SIZE));
+ return new GcsSeekableByteChannel(
+ blob.getStorage().reader(blob.getBlobId(), sourceOptions),
blob.getSize());
Review Comment:
I would change this to the following to avoid unnecessary ByteBuffer
allocations.
```suggestion
ReadChannel reader = blob.getStorage().reader(blob.getBlobId(),
sourceOptions);
// disable internal buffering, and make the channel non-blocking
reader.setChunkSize(0);
return new GcsSeekableByteChannel(reader, blob.getSize());
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]