This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d97dda860dc [FLINK-38225] Sets proper precondition to ensure that GCS 
operations are retryable (#27101)
d97dda860dc is described below

commit d97dda860dcf2bbec50514c7b7eb06a0426a25c0
Author: Oleksandr Nitavskyi <[email protected]>
AuthorDate: Tue Jan 6 02:15:58 2026 +0100

    [FLINK-38225] Sets proper precondition to ensure that GCS operations are 
retryable (#27101)
---
 flink-filesystems/flink-gs-fs-hadoop/pom.xml       |   8 +
 .../flink/fs/gs/storage/GSBlobStorageImpl.java     |  84 +++++++-
 .../flink/fs/gs/storage/GSBlobStorageImplTest.java | 217 +++++++++++++++++++++
 3 files changed, 304 insertions(+), 5 deletions(-)

diff --git a/flink-filesystems/flink-gs-fs-hadoop/pom.xml 
b/flink-filesystems/flink-gs-fs-hadoop/pom.xml
index 33f4c70e34b..cfbafe3ace3 100644
--- a/flink-filesystems/flink-gs-fs-hadoop/pom.xml
+++ b/flink-filesystems/flink-gs-fs-hadoop/pom.xml
@@ -36,6 +36,7 @@ under the License.
                <!-- in the GCS file system documentation. -->
                <fs.gs.sdk.version>2.29.1</fs.gs.sdk.version>
                
<fs.gs.connector.version>hadoop3-2.2.18</fs.gs.connector.version>
+               <fs.gs.cloud.nio.version>0.128.7</fs.gs.cloud.nio.version>
                <!-- Set this to the highest version of grpc artifacts from 
gcs-connector and google-cloud-storage -->
                <fs.gs.grpc.version>1.59.1</fs.gs.grpc.version>
        </properties>
@@ -149,6 +150,13 @@ under the License.
                        </exclusions>
                </dependency>
 
+               <dependency>
+                       <groupId>com.google.cloud</groupId>
+                       <artifactId>google-cloud-nio</artifactId>
+                       <version>${fs.gs.cloud.nio.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
        </dependencies>
 
        <dependencyManagement>
diff --git 
a/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/storage/GSBlobStorageImpl.java
 
b/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/storage/GSBlobStorageImpl.java
index a254d9946b6..09c95c1357b 100644
--- 
a/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/storage/GSBlobStorageImpl.java
+++ 
b/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/storage/GSBlobStorageImpl.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.fs.gs.storage;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.fs.gs.utils.BlobUtils;
 import org.apache.flink.util.Preconditions;
@@ -60,8 +61,7 @@ public class GSBlobStorageImpl implements GSBlobStorage {
         LOGGER.trace("Creating writable blob for identifier {}", 
blobIdentifier);
         Preconditions.checkNotNull(blobIdentifier);
 
-        BlobInfo blobInfo = 
BlobInfo.newBuilder(blobIdentifier.getBlobId()).build();
-        com.google.cloud.WriteChannel writeChannel = storage.writer(blobInfo);
+        com.google.cloud.WriteChannel writeChannel = 
createWriteChannel(blobIdentifier);
         return new WriteChannel(blobIdentifier, writeChannel);
     }
 
@@ -75,12 +75,23 @@ public class GSBlobStorageImpl implements GSBlobStorage {
         Preconditions.checkNotNull(blobIdentifier);
         Preconditions.checkArgument(chunkSize.getBytes() > 0);
 
-        BlobInfo blobInfo = 
BlobInfo.newBuilder(blobIdentifier.getBlobId()).build();
-        com.google.cloud.WriteChannel writeChannel = storage.writer(blobInfo);
+        com.google.cloud.WriteChannel writeChannel = 
createWriteChannel(blobIdentifier);
         writeChannel.setChunkSize((int) chunkSize.getBytes());
         return new WriteChannel(blobIdentifier, writeChannel);
     }
 
+    /**
+     * Creates a write channel for the given blob identifier with appropriate 
preconditions.
+     *
+     * @param blobIdentifier The blob identifier to create the write channel 
for
+     * @return The write channel with appropriate write options
+     */
+    private com.google.cloud.WriteChannel createWriteChannel(GSBlobIdentifier 
blobIdentifier) {
+        BlobInfo existingBlob = storage.get(blobIdentifier.bucketName, 
blobIdentifier.objectName);
+        BlobInfo blobInfo = 
BlobInfo.newBuilder(blobIdentifier.getBlobId()).build();
+        return storage.writer(blobInfo, getBlobWriteOption(existingBlob));
+    }
+
     @Override
     public void createBlob(GSBlobIdentifier blobIdentifier) {
         LOGGER.trace("Creating empty blob {}", blobIdentifier);
@@ -153,11 +164,74 @@ public class GSBlobStorageImpl implements GSBlobStorage {
         for (GSBlobIdentifier blobIdentifier : sourceBlobIdentifiers) {
             builder.addSource(blobIdentifier.objectName);
         }
+        BlobInfo existingTargetBlob =
+                storage.get(targetBlobIdentifier.bucketName, 
targetBlobIdentifier.objectName);
+        Storage.BlobTargetOption precondition = 
getBlobTargetOption(existingTargetBlob);
+        Storage.ComposeRequest request = 
builder.setTargetOptions(precondition).build();
 
-        Storage.ComposeRequest request = builder.build();
         storage.compose(request);
     }
 
+    /**
+     * Generic helper to create blob options with appropriate preconditions. 
This ensures that the
+     * operations become idempotent or atomic, allowing the GCS client to 
safely retry the 503
+     * errors.
+     *
+     * <p>For a target object that does not yet exist, sets the DoesNotExist 
precondition. This will
+     * cause the request to fail if the object is created before the request 
runs.
+     *
+     * <p>If the destination already exists, sets a generation-match 
precondition. This will cause
+     * the request to fail if the existing object's generation changes before 
the request runs.
+     *
+     * @param blobInfo The blob info to create the option for, or null if the 
blob does not exist
+     * @param doesNotExistSupplier Supplier for the doesNotExist option
+     * @param generationMatchFunction Function to create generationMatch 
option from generation
+     *     number
+     * @param <T> The type of the blob option (BlobTargetOption or 
BlobWriteOption)
+     * @return The appropriate option for the blob
+     */
+    @VisibleForTesting
+    <T> T getBlobOption(
+            BlobInfo blobInfo,
+            java.util.function.Supplier<T> doesNotExistSupplier,
+            java.util.function.Function<Long, T> generationMatchFunction) {
+        if (blobInfo == null) {
+            return doesNotExistSupplier.get();
+        } else {
+            return generationMatchFunction.apply(blobInfo.getGeneration());
+        }
+    }
+
+    /**
+     * Creates the appropriate BlobTargetOption for the given blob info.
+     *
+     * @param blobInfo The blob info to create the target option for, or null 
if the blob does not
+     *     exist
+     * @return The appropriate target option for the blob
+     */
+    @VisibleForTesting
+    Storage.BlobTargetOption getBlobTargetOption(BlobInfo blobInfo) {
+        return getBlobOption(
+                blobInfo,
+                Storage.BlobTargetOption::doesNotExist,
+                Storage.BlobTargetOption::generationMatch);
+    }
+
+    /**
+     * Creates the appropriate BlobWriteOption for the given blob info.
+     *
+     * @param blobInfo The blob info to create the write option for, or null 
if the blob does not
+     *     exist
+     * @return The appropriate write option for the blob
+     */
+    @VisibleForTesting
+    Storage.BlobWriteOption getBlobWriteOption(BlobInfo blobInfo) {
+        return getBlobOption(
+                blobInfo,
+                Storage.BlobWriteOption::doesNotExist,
+                Storage.BlobWriteOption::generationMatch);
+    }
+
     @Override
     public List<Boolean> delete(Iterable<GSBlobIdentifier> blobIdentifiers) {
         LOGGER.trace("Deleting blobs {}", blobIdentifiers);
diff --git 
a/flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/storage/GSBlobStorageImplTest.java
 
b/flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/storage/GSBlobStorageImplTest.java
new file mode 100644
index 00000000000..264d6e59353
--- /dev/null
+++ 
b/flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/storage/GSBlobStorageImplTest.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.storage;
+
+import org.apache.flink.configuration.MemorySize;
+
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.BlobInfo;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.contrib.nio.testing.LocalStorageHelper;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.IOException;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test {@link GSBlobStorageImpl}. */
+class GSBlobStorageImplTest {
+
+    private static final String TEST_BUCKET = "test-bucket";
+    private GSBlobStorageImpl blobStorage;
+    private Storage storage;
+
+    @BeforeEach
+    void setUp() {
+        storage = LocalStorageHelper.getOptions().getService();
+        blobStorage = new GSBlobStorageImpl(storage);
+    }
+
+    @ParameterizedTest(name = "{0} with null BlobInfo")
+    @MethodSource("provideOptionTypes")
+    void testGetBlobOptionWithNullBlobInfo(
+            String optionType,
+            Supplier<Object> doesNotExistSupplier,
+            Function<Long, Object> generationMatchFunction,
+            Object expectedDoesNotExist) {
+        // When blob doesn't exist (null), should return doesNotExist option
+        Object result =
+                blobStorage.getBlobOption(null, doesNotExistSupplier, 
generationMatchFunction);
+
+        assertThat(result).isNotNull();
+        assertThat(result).isEqualTo(expectedDoesNotExist);
+    }
+
+    @ParameterizedTest(name = "{0} with existing BlobInfo")
+    @MethodSource("provideOptionTypes")
+    void testGetBlobOptionWithExistingBlobInfo(
+            String optionType,
+            Supplier<Object> doesNotExistSupplier,
+            Function<Long, Object> generationMatchFunction,
+            Object expectedDoesNotExist) {
+        // Create a BlobInfo with a generation number
+        Long generation = 12345L;
+        BlobId blobId = BlobId.of("test-bucket", "test-object", generation);
+        BlobInfo blobInfo = BlobInfo.newBuilder(blobId).build();
+
+        // When blob exists, should return generationMatch option
+        Object result =
+                blobStorage.getBlobOption(blobInfo, doesNotExistSupplier, 
generationMatchFunction);
+
+        assertThat(result).isNotNull();
+        assertThat(result.toString()).contains(generation.toString());
+    }
+
+    @ParameterizedTest(name = "{0} with zero generation")
+    @MethodSource("provideOptionTypes")
+    void testGetBlobOptionWithZeroGeneration(
+            String optionType,
+            Supplier<Object> doesNotExistSupplier,
+            Function<Long, Object> generationMatchFunction,
+            Object expectedDoesNotExist) {
+        // Test edge case: blob with generation 0
+        BlobId blobId = BlobId.of("test-bucket", "test-object", 0L);
+        BlobInfo blobInfo = BlobInfo.newBuilder(blobId).build();
+
+        Object result =
+                blobStorage.getBlobOption(blobInfo, doesNotExistSupplier, 
generationMatchFunction);
+
+        assertThat(result).isNotNull();
+        assertThat(result.toString()).contains("0");
+    }
+
+    @Test
+    void testWriteBlob() throws IOException {
+        // Note: LocalStorageHelper doesn't properly track blob generation 
numbers,
+        // so we can only test writing to new blobs in this test
+        GSBlobIdentifier blobIdentifier = new GSBlobIdentifier(TEST_BUCKET, 
"test-blob");
+
+        // Write to the blob
+        GSBlobStorage.WriteChannel writeChannel = 
blobStorage.writeBlob(blobIdentifier);
+
+        assertThat(writeChannel).isNotNull();
+
+        // Write some data
+        byte[] data = "test data".getBytes();
+        int written = writeChannel.write(data, 0, data.length);
+        assertThat(written).isEqualTo(data.length);
+
+        writeChannel.close();
+
+        // Verify the blob was created
+        Blob blob = storage.get(blobIdentifier.getBlobId());
+        assertThat(blob).isNotNull();
+        assertThat(blob.getContent()).isEqualTo(data);
+    }
+
+    @Test
+    void testWriteBlobWithChunkSize() throws IOException {
+        GSBlobIdentifier blobIdentifier = new GSBlobIdentifier(TEST_BUCKET, 
"chunked-blob");
+        MemorySize chunkSize = MemorySize.parse("2b");
+
+        GSBlobStorage.WriteChannel writeChannel = 
blobStorage.writeBlob(blobIdentifier, chunkSize);
+
+        assertThat(writeChannel).isNotNull();
+
+        byte[] data = "test data with chunks".getBytes();
+        int written = writeChannel.write(data, 0, data.length);
+        assertThat(written).isEqualTo(data.length);
+
+        writeChannel.close();
+
+        // Verify the blob was created
+        Blob blob = storage.get(blobIdentifier.getBlobId());
+        assertThat(blob).isNotNull();
+        assertThat(blob.getContent()).isEqualTo(data);
+    }
+
+    @Test
+    void testComposeMultipleBlobs() {
+        // Note: LocalStorageHelper doesn't fully implement the compose 
operation,
+        // so we can only verify that the method completes without error
+
+        // Create source blobs with content
+        GSBlobIdentifier source1 = new GSBlobIdentifier(TEST_BUCKET, 
"source1");
+        GSBlobIdentifier source2 = new GSBlobIdentifier(TEST_BUCKET, 
"source2");
+        GSBlobIdentifier source3 = new GSBlobIdentifier(TEST_BUCKET, 
"source3");
+
+        byte[] data1 = "part1".getBytes();
+        byte[] data2 = "part2".getBytes();
+        byte[] data3 = "part3".getBytes();
+
+        storage.create(BlobInfo.newBuilder(source1.getBlobId()).build(), 
data1);
+        storage.create(BlobInfo.newBuilder(source2.getBlobId()).build(), 
data2);
+        storage.create(BlobInfo.newBuilder(source3.getBlobId()).build(), 
data3);
+
+        // Verify source blobs exist
+        assertThat(storage.get(source1.getBlobId())).isNotNull();
+        assertThat(storage.get(source2.getBlobId())).isNotNull();
+        assertThat(storage.get(source3.getBlobId())).isNotNull();
+
+        // Compose into target blob - this should complete without throwing an 
exception
+        GSBlobIdentifier target = new GSBlobIdentifier(TEST_BUCKET, 
"composed-target");
+        blobStorage.compose(java.util.Arrays.asList(source1, source2, 
source3), target);
+
+        // Verify the method completed (no exception thrown)
+        // The actual composition result cannot be verified with 
LocalStorageHelper
+    }
+
+    @Test
+    void testComposeSingleBlob() {
+        // Note: LocalStorageHelper doesn't fully implement the compose 
operation,
+        // so we can only verify that the method completes without error
+
+        // Create a single source blob
+        GSBlobIdentifier source = new GSBlobIdentifier(TEST_BUCKET, 
"single-source");
+        byte[] data = "single blob content".getBytes();
+        storage.create(BlobInfo.newBuilder(source.getBlobId()).build(), data);
+
+        // Verify source blob exists
+        assertThat(storage.get(source.getBlobId())).isNotNull();
+
+        // Compose into target (effectively a copy) - should complete without 
throwing
+        GSBlobIdentifier target = new GSBlobIdentifier(TEST_BUCKET, 
"single-target");
+        blobStorage.compose(java.util.Collections.singletonList(source), 
target);
+
+        // Verify the method completed (no exception thrown)
+        // The actual composition result cannot be verified with 
LocalStorageHelper
+    }
+
+    private static Stream<Arguments> provideOptionTypes() {
+        return Stream.of(
+                Arguments.of(
+                        "BlobTargetOption",
+                        (Supplier<Object>) 
Storage.BlobTargetOption::doesNotExist,
+                        (Function<Long, Object>) 
Storage.BlobTargetOption::generationMatch,
+                        Storage.BlobTargetOption.doesNotExist()),
+                Arguments.of(
+                        "BlobWriteOption",
+                        (Supplier<Object>) 
Storage.BlobWriteOption::doesNotExist,
+                        (Function<Long, Object>) 
Storage.BlobWriteOption::generationMatch,
+                        Storage.BlobWriteOption.doesNotExist()));
+    }
+}

Reply via email to