This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new d97dda860dc [FLINK-38225] Sets proper precondition to ensure that GCS
operations are retryable (#27101)
d97dda860dc is described below
commit d97dda860dcf2bbec50514c7b7eb06a0426a25c0
Author: Oleksandr Nitavskyi <[email protected]>
AuthorDate: Tue Jan 6 02:15:58 2026 +0100
[FLINK-38225] Sets proper precondition to ensure that GCS operations are
retryable (#27101)
---
flink-filesystems/flink-gs-fs-hadoop/pom.xml | 8 +
.../flink/fs/gs/storage/GSBlobStorageImpl.java | 84 +++++++-
.../flink/fs/gs/storage/GSBlobStorageImplTest.java | 217 +++++++++++++++++++++
3 files changed, 304 insertions(+), 5 deletions(-)
diff --git a/flink-filesystems/flink-gs-fs-hadoop/pom.xml
b/flink-filesystems/flink-gs-fs-hadoop/pom.xml
index 33f4c70e34b..cfbafe3ace3 100644
--- a/flink-filesystems/flink-gs-fs-hadoop/pom.xml
+++ b/flink-filesystems/flink-gs-fs-hadoop/pom.xml
@@ -36,6 +36,7 @@ under the License.
<!-- in the GCS file system documentation. -->
<fs.gs.sdk.version>2.29.1</fs.gs.sdk.version>
<fs.gs.connector.version>hadoop3-2.2.18</fs.gs.connector.version>
+ <fs.gs.cloud.nio.version>0.128.7</fs.gs.cloud.nio.version>
<!-- Set this to the highest version of grpc artifacts from
gcs-connector and google-cloud-storage -->
<fs.gs.grpc.version>1.59.1</fs.gs.grpc.version>
</properties>
@@ -149,6 +150,13 @@ under the License.
</exclusions>
</dependency>
+ <dependency>
+ <groupId>com.google.cloud</groupId>
+ <artifactId>google-cloud-nio</artifactId>
+ <version>${fs.gs.cloud.nio.version}</version>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
<dependencyManagement>
diff --git
a/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/storage/GSBlobStorageImpl.java
b/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/storage/GSBlobStorageImpl.java
index a254d9946b6..09c95c1357b 100644
---
a/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/storage/GSBlobStorageImpl.java
+++
b/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/storage/GSBlobStorageImpl.java
@@ -18,6 +18,7 @@
package org.apache.flink.fs.gs.storage;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.fs.gs.utils.BlobUtils;
import org.apache.flink.util.Preconditions;
@@ -60,8 +61,7 @@ public class GSBlobStorageImpl implements GSBlobStorage {
LOGGER.trace("Creating writable blob for identifier {}",
blobIdentifier);
Preconditions.checkNotNull(blobIdentifier);
- BlobInfo blobInfo =
BlobInfo.newBuilder(blobIdentifier.getBlobId()).build();
- com.google.cloud.WriteChannel writeChannel = storage.writer(blobInfo);
+ com.google.cloud.WriteChannel writeChannel =
createWriteChannel(blobIdentifier);
return new WriteChannel(blobIdentifier, writeChannel);
}
@@ -75,12 +75,23 @@ public class GSBlobStorageImpl implements GSBlobStorage {
Preconditions.checkNotNull(blobIdentifier);
Preconditions.checkArgument(chunkSize.getBytes() > 0);
- BlobInfo blobInfo =
BlobInfo.newBuilder(blobIdentifier.getBlobId()).build();
- com.google.cloud.WriteChannel writeChannel = storage.writer(blobInfo);
+ com.google.cloud.WriteChannel writeChannel =
createWriteChannel(blobIdentifier);
writeChannel.setChunkSize((int) chunkSize.getBytes());
return new WriteChannel(blobIdentifier, writeChannel);
}
+ /**
+ * Creates a write channel for the given blob identifier with appropriate
preconditions.
+ *
+ * @param blobIdentifier The blob identifier to create the write channel
for
+ * @return The write channel with appropriate write options
+ */
+ private com.google.cloud.WriteChannel createWriteChannel(GSBlobIdentifier
blobIdentifier) {
+ BlobInfo existingBlob = storage.get(blobIdentifier.bucketName,
blobIdentifier.objectName);
+ BlobInfo blobInfo =
BlobInfo.newBuilder(blobIdentifier.getBlobId()).build();
+ return storage.writer(blobInfo, getBlobWriteOption(existingBlob));
+ }
+
@Override
public void createBlob(GSBlobIdentifier blobIdentifier) {
LOGGER.trace("Creating empty blob {}", blobIdentifier);
@@ -153,11 +164,74 @@ public class GSBlobStorageImpl implements GSBlobStorage {
for (GSBlobIdentifier blobIdentifier : sourceBlobIdentifiers) {
builder.addSource(blobIdentifier.objectName);
}
+ BlobInfo existingTargetBlob =
+ storage.get(targetBlobIdentifier.bucketName,
targetBlobIdentifier.objectName);
+ Storage.BlobTargetOption precondition =
getBlobTargetOption(existingTargetBlob);
+ Storage.ComposeRequest request =
builder.setTargetOptions(precondition).build();
- Storage.ComposeRequest request = builder.build();
storage.compose(request);
}
+ /**
+ * Generic helper to create blob options with appropriate preconditions.
This ensures that the
+ * operations become idempotent or atomic, allowing the GCS client to
safely retry the 503
+ * errors.
+ *
+ * <p>For a target object that does not yet exist, sets the DoesNotExist
precondition. This will
+ * cause the request to fail if the object is created before the request
runs.
+ *
+ * <p>If the destination already exists, sets a generation-match
precondition. This will cause
+ * the request to fail if the existing object's generation changes before
the request runs.
+ *
+ * @param blobInfo The blob info to create the option for, or null if the
blob does not exist
+ * @param doesNotExistSupplier Supplier for the doesNotExist option
+ * @param generationMatchFunction Function to create generationMatch
option from generation
+ * number
+ * @param <T> The type of the blob option (BlobTargetOption or
BlobWriteOption)
+ * @return The appropriate option for the blob
+ */
+ @VisibleForTesting
+ <T> T getBlobOption(
+ BlobInfo blobInfo,
+ java.util.function.Supplier<T> doesNotExistSupplier,
+ java.util.function.Function<Long, T> generationMatchFunction) {
+ if (blobInfo == null) {
+ return doesNotExistSupplier.get();
+ } else {
+ return generationMatchFunction.apply(blobInfo.getGeneration());
+ }
+ }
+
+ /**
+ * Creates the appropriate BlobTargetOption for the given blob info.
+ *
+ * @param blobInfo The blob info to create the target option for, or null
if the blob does not
+ * exist
+ * @return The appropriate target option for the blob
+ */
+ @VisibleForTesting
+ Storage.BlobTargetOption getBlobTargetOption(BlobInfo blobInfo) {
+ return getBlobOption(
+ blobInfo,
+ Storage.BlobTargetOption::doesNotExist,
+ Storage.BlobTargetOption::generationMatch);
+ }
+
+ /**
+ * Creates the appropriate BlobWriteOption for the given blob info.
+ *
+ * @param blobInfo The blob info to create the write option for, or null
if the blob does not
+ * exist
+ * @return The appropriate write option for the blob
+ */
+ @VisibleForTesting
+ Storage.BlobWriteOption getBlobWriteOption(BlobInfo blobInfo) {
+ return getBlobOption(
+ blobInfo,
+ Storage.BlobWriteOption::doesNotExist,
+ Storage.BlobWriteOption::generationMatch);
+ }
+
@Override
public List<Boolean> delete(Iterable<GSBlobIdentifier> blobIdentifiers) {
LOGGER.trace("Deleting blobs {}", blobIdentifiers);
diff --git
a/flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/storage/GSBlobStorageImplTest.java
b/flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/storage/GSBlobStorageImplTest.java
new file mode 100644
index 00000000000..264d6e59353
--- /dev/null
+++
b/flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/storage/GSBlobStorageImplTest.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.storage;
+
+import org.apache.flink.configuration.MemorySize;
+
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.BlobInfo;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.contrib.nio.testing.LocalStorageHelper;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.IOException;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test {@link GSBlobStorageImpl}. */
+class GSBlobStorageImplTest {
+
+ private static final String TEST_BUCKET = "test-bucket";
+ private GSBlobStorageImpl blobStorage;
+ private Storage storage;
+
+ @BeforeEach
+ void setUp() {
+ storage = LocalStorageHelper.getOptions().getService();
+ blobStorage = new GSBlobStorageImpl(storage);
+ }
+
+ @ParameterizedTest(name = "{0} with null BlobInfo")
+ @MethodSource("provideOptionTypes")
+ void testGetBlobOptionWithNullBlobInfo(
+ String optionType,
+ Supplier<Object> doesNotExistSupplier,
+ Function<Long, Object> generationMatchFunction,
+ Object expectedDoesNotExist) {
+ // When blob doesn't exist (null), should return doesNotExist option
+ Object result =
+ blobStorage.getBlobOption(null, doesNotExistSupplier,
generationMatchFunction);
+
+ assertThat(result).isNotNull();
+ assertThat(result).isEqualTo(expectedDoesNotExist);
+ }
+
+ @ParameterizedTest(name = "{0} with existing BlobInfo")
+ @MethodSource("provideOptionTypes")
+ void testGetBlobOptionWithExistingBlobInfo(
+ String optionType,
+ Supplier<Object> doesNotExistSupplier,
+ Function<Long, Object> generationMatchFunction,
+ Object expectedDoesNotExist) {
+ // Create a BlobInfo with a generation number
+ Long generation = 12345L;
+ BlobId blobId = BlobId.of("test-bucket", "test-object", generation);
+ BlobInfo blobInfo = BlobInfo.newBuilder(blobId).build();
+
+ // When blob exists, should return generationMatch option
+ Object result =
+ blobStorage.getBlobOption(blobInfo, doesNotExistSupplier,
generationMatchFunction);
+
+ assertThat(result).isNotNull();
+ assertThat(result.toString()).contains(generation.toString());
+ }
+
+ @ParameterizedTest(name = "{0} with zero generation")
+ @MethodSource("provideOptionTypes")
+ void testGetBlobOptionWithZeroGeneration(
+ String optionType,
+ Supplier<Object> doesNotExistSupplier,
+ Function<Long, Object> generationMatchFunction,
+ Object expectedDoesNotExist) {
+ // Test edge case: blob with generation 0
+ BlobId blobId = BlobId.of("test-bucket", "test-object", 0L);
+ BlobInfo blobInfo = BlobInfo.newBuilder(blobId).build();
+
+ Object result =
+ blobStorage.getBlobOption(blobInfo, doesNotExistSupplier,
generationMatchFunction);
+
+ assertThat(result).isNotNull();
+ assertThat(result.toString()).contains("0");
+ }
+
+ @Test
+ void testWriteBlob() throws IOException {
+ // Note: LocalStorageHelper doesn't properly track blob generation
numbers,
+ // so we can only test writing to new blobs in this test
+ GSBlobIdentifier blobIdentifier = new GSBlobIdentifier(TEST_BUCKET,
"test-blob");
+
+ // Write to the blob
+ GSBlobStorage.WriteChannel writeChannel =
blobStorage.writeBlob(blobIdentifier);
+
+ assertThat(writeChannel).isNotNull();
+
+ // Write some data
+ byte[] data = "test data".getBytes();
+ int written = writeChannel.write(data, 0, data.length);
+ assertThat(written).isEqualTo(data.length);
+
+ writeChannel.close();
+
+ // Verify the blob was created
+ Blob blob = storage.get(blobIdentifier.getBlobId());
+ assertThat(blob).isNotNull();
+ assertThat(blob.getContent()).isEqualTo(data);
+ }
+
+ @Test
+ void testWriteBlobWithChunkSize() throws IOException {
+ GSBlobIdentifier blobIdentifier = new GSBlobIdentifier(TEST_BUCKET,
"chunked-blob");
+ MemorySize chunkSize = MemorySize.parse("2b");
+
+ GSBlobStorage.WriteChannel writeChannel =
blobStorage.writeBlob(blobIdentifier, chunkSize);
+
+ assertThat(writeChannel).isNotNull();
+
+ byte[] data = "test data with chunks".getBytes();
+ int written = writeChannel.write(data, 0, data.length);
+ assertThat(written).isEqualTo(data.length);
+
+ writeChannel.close();
+
+ // Verify the blob was created
+ Blob blob = storage.get(blobIdentifier.getBlobId());
+ assertThat(blob).isNotNull();
+ assertThat(blob.getContent()).isEqualTo(data);
+ }
+
+ @Test
+ void testComposeMultipleBlobs() {
+ // Note: LocalStorageHelper doesn't fully implement the compose
operation,
+ // so we can only verify that the method completes without error
+
+ // Create source blobs with content
+ GSBlobIdentifier source1 = new GSBlobIdentifier(TEST_BUCKET,
"source1");
+ GSBlobIdentifier source2 = new GSBlobIdentifier(TEST_BUCKET,
"source2");
+ GSBlobIdentifier source3 = new GSBlobIdentifier(TEST_BUCKET,
"source3");
+
+ byte[] data1 = "part1".getBytes();
+ byte[] data2 = "part2".getBytes();
+ byte[] data3 = "part3".getBytes();
+
+ storage.create(BlobInfo.newBuilder(source1.getBlobId()).build(),
data1);
+ storage.create(BlobInfo.newBuilder(source2.getBlobId()).build(),
data2);
+ storage.create(BlobInfo.newBuilder(source3.getBlobId()).build(),
data3);
+
+ // Verify source blobs exist
+ assertThat(storage.get(source1.getBlobId())).isNotNull();
+ assertThat(storage.get(source2.getBlobId())).isNotNull();
+ assertThat(storage.get(source3.getBlobId())).isNotNull();
+
+ // Compose into target blob - this should complete without throwing an
exception
+ GSBlobIdentifier target = new GSBlobIdentifier(TEST_BUCKET,
"composed-target");
+ blobStorage.compose(java.util.Arrays.asList(source1, source2,
source3), target);
+
+ // Verify the method completed (no exception thrown)
+ // The actual composition result cannot be verified with
LocalStorageHelper
+ }
+
+ @Test
+ void testComposeSingleBlob() {
+ // Note: LocalStorageHelper doesn't fully implement the compose
operation,
+ // so we can only verify that the method completes without error
+
+ // Create a single source blob
+ GSBlobIdentifier source = new GSBlobIdentifier(TEST_BUCKET,
"single-source");
+ byte[] data = "single blob content".getBytes();
+ storage.create(BlobInfo.newBuilder(source.getBlobId()).build(), data);
+
+ // Verify source blob exists
+ assertThat(storage.get(source.getBlobId())).isNotNull();
+
+ // Compose into target (effectively a copy) - should complete without
throwing
+ GSBlobIdentifier target = new GSBlobIdentifier(TEST_BUCKET,
"single-target");
+ blobStorage.compose(java.util.Collections.singletonList(source),
target);
+
+ // Verify the method completed (no exception thrown)
+ // The actual composition result cannot be verified with
LocalStorageHelper
+ }
+
+ private static Stream<Arguments> provideOptionTypes() {
+ return Stream.of(
+ Arguments.of(
+ "BlobTargetOption",
+ (Supplier<Object>)
Storage.BlobTargetOption::doesNotExist,
+ (Function<Long, Object>)
Storage.BlobTargetOption::generationMatch,
+ Storage.BlobTargetOption.doesNotExist()),
+ Arguments.of(
+ "BlobWriteOption",
+ (Supplier<Object>)
Storage.BlobWriteOption::doesNotExist,
+ (Function<Long, Object>)
Storage.BlobWriteOption::generationMatch,
+ Storage.BlobWriteOption.doesNotExist()));
+ }
+}