This is an automated email from the ASF dual-hosted git repository.
shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 7235088c490 GCS client library migration in Java SDK - part 3 (#37900)
7235088c490 is described below
commit 7235088c490112f3bf1f38da770cc7338450b8e9
Author: Shunping Huang <[email protected]>
AuthorDate: Wed May 6 12:33:52 2026 -0400
GCS client library migration in Java SDK - part 3 (#37900)
* Implement open method for gcsutilv2. Add an integration test.
* Implement create method and add an integration test.
* Store the gcs path into GcsWritableByteChannel.
* Rename the new create method to createV2.
* Revise according to reviewer comments.
---
.../beam/sdk/extensions/gcp/util/GcsUtil.java | 23 +++
.../beam/sdk/extensions/gcp/util/GcsUtilV2.java | 163 ++++++++++++++++++++-
.../gcp/util/GcsUtilParameterizedIT.java | 116 +++++++++++++--
3 files changed, 291 insertions(+), 11 deletions(-)
diff --git
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
index e3f01dd8529..ed727d495cf 100644
---
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
+++
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
@@ -25,6 +25,8 @@ import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BucketInfo;
import com.google.cloud.storage.Storage.BlobGetOption;
import com.google.cloud.storage.Storage.BlobListOption;
+import com.google.cloud.storage.Storage.BlobSourceOption;
+import com.google.cloud.storage.Storage.BlobWriteOption;
import com.google.cloud.storage.Storage.BucketGetOption;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
@@ -186,9 +188,19 @@ public class GcsUtil {
}
public SeekableByteChannel open(GcsPath path) throws IOException {
+ if (delegateV2 != null) {
+ return delegateV2.open(path);
+ }
return delegate.open(path);
}
+ public SeekableByteChannel openV2(GcsPath path, BlobSourceOption... options)
throws IOException {
+ if (delegateV2 != null) {
+ return delegateV2.open(path, options);
+ }
+ throw new IOException("GcsUtil V2 not initialized.");
+ }
+
/** @deprecated Use {@link #create(GcsPath, CreateOptions)} instead. */
@Deprecated
public WritableByteChannel create(GcsPath path, String type) throws
IOException {
@@ -254,9 +266,20 @@ public class GcsUtil {
}
public WritableByteChannel create(GcsPath path, CreateOptions options)
throws IOException {
+ if (delegateV2 != null) {
+ delegateV2.create(path, options.delegate);
+ }
return delegate.create(path, options.delegate);
}
+ public WritableByteChannel createV2(
+ GcsPath path, CreateOptions options, BlobWriteOption... writeOptions)
throws IOException {
+ if (delegateV2 != null) {
+ return delegateV2.create(path, options.delegate, writeOptions);
+ }
+ throw new IOException("GcsUtil V2 not initialized.");
+ }
+
public void verifyBucketAccessible(GcsPath path) throws IOException {
if (delegateV2 != null) {
delegateV2.verifyBucketAccessible(path);
diff --git
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java
index b00b7ce0d72..9119dd79652 100644
---
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java
+++
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java
@@ -23,6 +23,8 @@ import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Pr
import com.google.api.gax.paging.Page;
import com.google.auto.value.AutoValue;
+import com.google.cloud.ReadChannel;
+import com.google.cloud.WriteChannel;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
@@ -33,21 +35,29 @@ import com.google.cloud.storage.Storage;
import com.google.cloud.storage.Storage.BlobField;
import com.google.cloud.storage.Storage.BlobGetOption;
import com.google.cloud.storage.Storage.BlobListOption;
+import com.google.cloud.storage.Storage.BlobSourceOption;
+import com.google.cloud.storage.Storage.BlobWriteOption;
import com.google.cloud.storage.Storage.BucketField;
import com.google.cloud.storage.Storage.BucketGetOption;
import com.google.cloud.storage.Storage.CopyRequest;
import com.google.cloud.storage.StorageBatch;
import com.google.cloud.storage.StorageBatchResult;
+import com.google.cloud.storage.StorageChannelUtils;
import com.google.cloud.storage.StorageException;
import com.google.cloud.storage.StorageOptions;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SeekableByteChannel;
+import java.nio.channels.WritableByteChannel;
import java.nio.file.AccessDeniedException;
import java.nio.file.FileAlreadyExistsException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
import org.apache.beam.sdk.options.DefaultValueFactory;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -70,6 +80,8 @@ class GcsUtilV2 {
private Storage storage;
+ private final @Nullable Integer uploadBufferSizeBytes;
+
/** Maximum number of items to retrieve per Objects.List request. */
private static final long MAX_LIST_BLOBS_PER_CALL = 1024;
@@ -85,13 +97,14 @@ class GcsUtilV2 {
GcsUtilV2(PipelineOptions options) {
String projectId = options.as(GcpOptions.class).getProject();
storage =
StorageOptions.newBuilder().setProjectId(projectId).build().getService();
+ uploadBufferSizeBytes =
options.as(GcsOptions.class).getGcsUploadBufferSizeBytes();
}
@SuppressWarnings({
"nullness" // For Creating AccessDeniedException FileNotFoundException, and
// FileAlreadyExistsException with null.
})
- private IOException translateStorageException(GcsPath gcsPath,
StorageException e) {
+ private static IOException translateStorageException(GcsPath gcsPath,
StorageException e) {
switch (e.getCode()) {
case 403:
return new AccessDeniedException(gcsPath.toString(), null,
e.getMessage());
@@ -481,4 +494,152 @@ class GcsUtilV2 {
throw translateStorageException(bucketInfo.getName(), null, e);
}
}
+
+ /** A bridge that allows a GCS ReadChannel to behave as a
SeekableByteChannel. */
+ private static class GcsSeekableByteChannel implements SeekableByteChannel {
+ private final ReadChannel reader;
+ private final long size;
+ private long position = 0;
+
+ GcsSeekableByteChannel(ReadChannel reader, long size) {
+ this.reader = reader;
+ this.size = size;
+ this.position = 0;
+ }
+
+ @Override
+ public int read(ByteBuffer dst) throws IOException {
+ int count = StorageChannelUtils.blockingFillFrom(dst, reader);
+ if (count > 0) {
+ this.position += count;
+ }
+ return count;
+ }
+
+ @Override
+ public SeekableByteChannel position(long newPosition) throws IOException {
+ checkArgument(newPosition >= 0, "Position must be non-negative: %s",
newPosition);
+ reader.seek(newPosition);
+ this.position = newPosition;
+ return this;
+ }
+
+ @Override
+ public long position() throws IOException {
+ return this.position;
+ }
+
+ @Override
+ public long size() throws IOException {
+ return size;
+ }
+
+ @Override
+ public SeekableByteChannel truncate(long size) throws IOException {
+ throw new UnsupportedOperationException(
+ "GcsSeekableByteChannels are read-only and cannot be truncated.");
+ }
+
+ @Override
+ public int write(ByteBuffer src) throws IOException {
+ throw new UnsupportedOperationException(
+ "GcsSeekableByteChannel are read-only and does not support
writing.");
+ }
+
+ @Override
+ public boolean isOpen() {
+ return reader.isOpen();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (isOpen()) {
+ reader.close();
+ }
+ }
+ }
+
+ public SeekableByteChannel open(GcsPath path, BlobSourceOption...
sourceOptions)
+ throws IOException {
+ Blob blob = getBlob(path, BlobGetOption.fields(BlobField.SIZE));
+ ReadChannel reader = blob.getStorage().reader(blob.getBlobId(),
sourceOptions);
+ // disable internal buffering, and make the channel non-blocking
+ reader.setChunkSize(0);
+ return new GcsSeekableByteChannel(reader, blob.getSize());
+ }
+
+ /** A bridge that allows a GCS WriteChannel to behave as a
WritableByteChannel. */
+ private static class GcsWritableByteChannel implements WritableByteChannel {
+ private final WriteChannel writer;
+ private final GcsPath gcsPath;
+
+ GcsWritableByteChannel(WriteChannel writer, GcsPath gcsPath) {
+ this.writer = writer;
+ this.gcsPath = gcsPath;
+ }
+
+ @Override
+ public int write(ByteBuffer src) throws IOException {
+ try {
+ return writer.write(src);
+ } catch (StorageException e) {
+ throw translateStorageException(gcsPath, e);
+ }
+ }
+
+ @Override
+ public boolean isOpen() {
+ return writer.isOpen();
+ }
+
+ @Override
+ public void close() throws IOException {
+ writer.close();
+ }
+ }
+
+ public WritableByteChannel create(
+ GcsPath path, GcsUtilV1.CreateOptions options, BlobWriteOption...
writeOptions)
+ throws IOException {
+ try {
+ // Define the metadata for the new object
+ BlobInfo.Builder builder = BlobInfo.newBuilder(path.getBucket(),
path.getObject());
+ String type = options.getContentType();
+ if (type != null) {
+ builder.setContentType(type);
+ }
+
+ BlobInfo blobInfo = builder.build();
+
+ List<BlobWriteOption> writeOptionList = new
ArrayList<>(Arrays.asList(writeOptions));
+ if (options.getExpectFileToNotExist()) {
+ writeOptionList.add(BlobWriteOption.doesNotExist());
+ } else {
+ // We do not merge this check with the getExpectFileToNotExist()
branch above
+ // because we don't want to always make the storage.get() RPC call.
+ Blob blob = storage.get(path.getBucket(), path.getObject());
+ if (blob == null) {
+ writeOptionList.add(BlobWriteOption.doesNotExist());
+ } else {
+
writeOptionList.add(BlobWriteOption.generationMatch(blob.getGeneration()));
+ }
+ }
+ // Open a WriteChannel from the storage service
+ WriteChannel writer =
+ storage.writer(blobInfo, writeOptionList.toArray(new
BlobWriteOption[0]));
+ Integer uploadBufferSizeBytes =
+ options.getUploadBufferSizeBytes() != null
+ ? options.getUploadBufferSizeBytes()
+ : this.uploadBufferSizeBytes;
+ if (uploadBufferSizeBytes != null) {
+ writer.setChunkSize(uploadBufferSizeBytes);
+ }
+
+ // Return the bridge wrapper
+ return new GcsWritableByteChannel(writer, path);
+
+ } catch (StorageException e) {
+ throw translateStorageException(path, e);
+ }
+ }
}
diff --git
a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java
b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java
index 80ffd72924f..5759bb10a65 100644
---
a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java
+++
b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.extensions.gcp.util;
+import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThrows;
@@ -28,15 +29,25 @@ import com.google.api.services.storage.model.Objects;
import com.google.api.services.storage.model.StorageObject;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BucketInfo;
+import com.google.cloud.storage.StorageChannelUtils;
+import java.io.ByteArrayOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.SeekableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.nio.charset.StandardCharsets;
import java.nio.file.AccessDeniedException;
import java.nio.file.FileAlreadyExistsException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
+import org.apache.beam.sdk.extensions.gcp.util.GcsUtil.CreateOptions;
import org.apache.beam.sdk.extensions.gcp.util.GcsUtilV2.MissingStrategy;
import org.apache.beam.sdk.extensions.gcp.util.GcsUtilV2.OverwriteStrategy;
import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
@@ -301,7 +312,8 @@ public class GcsUtilParameterizedIT {
}
}
- private List<GcsPath> createTestBucketHelper(String bucketName) throws
IOException {
+ private List<GcsPath> createTestBucketHelper(String bucketName, boolean
copyData)
+ throws IOException {
final List<GcsPath> originPaths =
Arrays.asList(
GcsPath.fromUri("gs://apache-beam-samples/shakespeare/kingrichardii.txt"),
@@ -316,16 +328,24 @@ public class GcsUtilParameterizedIT {
if (experiment.equals("use_gcsutil_v2")) {
gcsUtil.createBucket(BucketInfo.of(bucketName));
- gcsUtil.copyV2(originPaths, testPaths);
+ if (copyData) {
+ gcsUtil.copyV2(originPaths, testPaths);
+ } else {
+ return Collections.emptyList();
+ }
} else {
GcsOptions gcsOptions = options.as(GcsOptions.class);
gcsUtil.createBucket(gcsOptions.getProject(), new
Bucket().setName(bucketName));
- final List<String> originList =
- originPaths.stream().map(o ->
o.toString()).collect(Collectors.toList());
- final List<String> testList =
- testPaths.stream().map(o ->
o.toString()).collect(Collectors.toList());
- gcsUtil.copy(originList, testList);
+ if (copyData) {
+ final List<String> originList =
+ originPaths.stream().map(o ->
o.toString()).collect(Collectors.toList());
+ final List<String> testList =
+ testPaths.stream().map(o ->
o.toString()).collect(Collectors.toList());
+ gcsUtil.copy(originList, testList);
+ } else {
+ return Collections.emptyList();
+ }
}
return testPaths;
@@ -355,7 +375,7 @@ public class GcsUtilParameterizedIT {
final String nonExistentBucket = "my-random-test-bucket-12345";
try {
- final List<GcsPath> srcPaths = createTestBucketHelper(existingBucket);
+ final List<GcsPath> srcPaths = createTestBucketHelper(existingBucket,
true);
final List<GcsPath> dstPaths =
srcPaths.stream()
.map(o -> GcsPath.fromComponents(existingBucket, o.getObject() +
".bak"))
@@ -423,7 +443,7 @@ public class GcsUtilParameterizedIT {
final String nonExistentBucket = "my-random-test-bucket-12345";
try {
- final List<GcsPath> srcPaths = createTestBucketHelper(existingBucket);
+ final List<GcsPath> srcPaths = createTestBucketHelper(existingBucket,
true);
final List<GcsPath> errPaths =
srcPaths.stream()
.map(o -> GcsPath.fromComponents(nonExistentBucket,
o.getObject()))
@@ -485,7 +505,7 @@ public class GcsUtilParameterizedIT {
final String nonExistentBucket = "my-random-test-bucket-12345";
try {
- final List<GcsPath> srcPaths = createTestBucketHelper(existingBucket);
+ final List<GcsPath> srcPaths = createTestBucketHelper(existingBucket,
true);
final List<GcsPath> tmpPaths =
srcPaths.stream()
.map(o -> GcsPath.fromComponents(existingBucket, "tmp/" +
o.getObject()))
@@ -587,4 +607,80 @@ public class GcsUtilParameterizedIT {
assertThrows(FileNotFoundException.class, () -> gcsUtil.getObject(path));
}
}
+
+ String computeHash(ByteBuffer buffer) throws NoSuchAlgorithmException {
+ MessageDigest digest = MessageDigest.getInstance("SHA-256");
+ digest.update(buffer);
+ byte[] hashBytes = digest.digest();
+
+ // Convert bytes to Hex String
+ StringBuilder sb = new StringBuilder();
+ for (byte b : hashBytes) {
+ sb.append(String.format("%02x", b));
+ }
+ return sb.toString();
+ }
+
+ @Test
+ public void testRead() throws IOException, NoSuchAlgorithmException {
+ final GcsPath gcsPath =
GcsPath.fromUri("gs://apache-beam-samples/shakespeare/kinglear.txt");
+ final String expectedHash =
"674a2725884307c96398440497c889ad8cecccedf5689df85e6b0faabe4e0fe8";
+ final long expectedSize = 157283L;
+
+ try (SeekableByteChannel channel = gcsUtil.open(gcsPath)) {
+ // Verify Size
+ assertEquals(expectedSize, channel.size());
+ assertEquals(0, channel.position());
+
+ // Read content into ByteBuffer.
+ // Allocate a larger buffer to ensure we receive the EOF at the expected
place.
+ ByteBuffer buffer = ByteBuffer.allocate((int) expectedSize + 1024);
+ int bytesRead = StorageChannelUtils.blockingFillFrom(buffer, channel);
+
+ // Verify total bytes read and position
+ assertEquals(expectedSize, bytesRead);
+ assertEquals(expectedSize, channel.position());
+
+ // Flip the buffer to prepare it for reading (sets limit to current
position, position to 0)
+ buffer.flip();
+
+ // Verify hash
+ String actualHash = computeHash(buffer);
+ assertEquals("Content hash should match", expectedHash, actualHash);
+ }
+ }
+
+ @Test
+ public void testWriteAndRead() throws IOException {
+ final String bucketName = "apache-beam-temp-bucket-12345";
+ final GcsPath targetPath =
+ GcsPath.fromComponents(bucketName, "test-object-" +
java.util.UUID.randomUUID() + ".txt");
+ final byte[] content = "Hello, GCS!".getBytes(StandardCharsets.UTF_8);
+
+ try {
+ createTestBucketHelper(bucketName, false);
+
+ // Write content to a GCS file
+ CreateOptions options =
CreateOptions.builder().setExpectFileToNotExist(true).build();
+ try (WritableByteChannel writer = gcsUtil.create(targetPath, options)) {
+ writer.write(ByteBuffer.wrap(content));
+ }
+
+ // Read content into a buffer
+ ByteArrayOutputStream readContent = new ByteArrayOutputStream();
+ try (ReadableByteChannel reader = gcsUtil.open(targetPath)) {
+ ByteBuffer buffer = ByteBuffer.allocate(1024);
+ while (reader.read(buffer) != -1) {
+ buffer.flip();
+ readContent.write(buffer.array(), 0, buffer.limit());
+ buffer.clear();
+ }
+ }
+
+ // Verify content
+ assertArrayEquals(content, readContent.toByteArray());
+ } finally {
+ tearDownTestBucketHelper(bucketName);
+ }
+ }
}