This is an automated email from the ASF dual-hosted git repository.
MartijnVisser pushed a commit to branch release-2.3
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-2.3 by this push:
new 45ccbca8244 [FLINK-39136][filesystems] Bump google-cloud-storage to
2.68.0 in flink-gs-fs-hadoop (#28286)
45ccbca8244 is described below
commit 45ccbca82446848454f3e919d7ca6bf17689c258
Author: Martijn Visser <[email protected]>
AuthorDate: Wed Jun 3 08:59:01 2026 +0200
[FLINK-39136][filesystems] Bump google-cloud-storage to 2.68.0 in
flink-gs-fs-hadoop (#28286)
* [FLINK-39136][filesystems] Bump google-cloud-storage to 2.68.0 in
flink-gs-fs-hadoop
The GCS file system bundled google-cloud-storage 2.29.1, which throws a
NullPointerException instead of retrying certain GCS 503 Service Unavailable
errors during resumable uploads, breaking checkpointing for jobs writing to
gs:// via a RecoverableWriter. The upstream fix is in
googleapis/java-storage#2987.
Bump google-cloud-storage 2.29.1 -> 2.68.0 and the matching grpc artifacts
1.59.1 -> 1.81.0, regenerate the bundled-dependency NOTICE accordingly, add
the
bundled license file for the newly bundled stax2-api, and update the version
links in the GCS filesystem documentation.
Add integration tests that run against a real GCS bucket, mirroring the
existing
S3 filesystem integration tests. They are skipped unless a bucket is
configured
via the IT_CASE_GCS_BUCKET environment variable; authentication uses
Application
Default Credentials (GOOGLE_APPLICATION_CREDENTIALS).
Generated-by: Claude Code (Opus 4.8)
---
docs/content.zh/docs/deployment/filesystems/gcs.md | 2 +-
docs/content/docs/deployment/filesystems/gcs.md | 2 +-
.../cce59259-d3cb-4048-a1bd-31197f847189 | 12 ++
flink-filesystems/flink-gs-fs-hadoop/pom.xml | 18 ++-
.../src/main/resources/META-INF/NOTICE | 129 ++++++++++++--------
.../resources/META-INF/licenses/LICENSE.stax2-api | 22 ++++
.../flink/fs/gs/GSFileSystemBehaviorITCase.java | 69 +++++++++++
.../flink/fs/gs/GSRecoverableWriterITCase.java | 132 +++++++++++++++++++++
.../org/apache/flink/fs/gs/GSTestCredentials.java | 53 +++++++++
9 files changed, 383 insertions(+), 56 deletions(-)
diff --git a/docs/content.zh/docs/deployment/filesystems/gcs.md
b/docs/content.zh/docs/deployment/filesystems/gcs.md
index 19263b112c4..0b971b24747 100644
--- a/docs/content.zh/docs/deployment/filesystems/gcs.md
+++ b/docs/content.zh/docs/deployment/filesystems/gcs.md
@@ -71,7 +71,7 @@ Note that these examples are *not* exhaustive and you can use
GCS in other place
Flink provides the `flink-gs-fs-hadoop` file system to write to GCS.
This implementation is self-contained with no dependency footprint, so there
is no need to add Hadoop to the classpath to use it.
-`flink-gs-fs-hadoop` registers a `FileSystem` wrapper for URIs with the
*gs://* scheme. It uses Google's
[gcs-connector](https://mvnrepository.com/artifact/com.google.cloud.bigdataoss/gcs-connector/hadoop3-2.2.18)
Hadoop library to access GCS. It also uses Google's
[google-cloud-storage](https://mvnrepository.com/artifact/com.google.cloud/google-cloud-storage/2.29.1)
library to provide `RecoverableWriter` support.
+`flink-gs-fs-hadoop` registers a `FileSystem` wrapper for URIs with the
*gs://* scheme. It uses Google's
[gcs-connector](https://mvnrepository.com/artifact/com.google.cloud.bigdataoss/gcs-connector/hadoop3-2.2.18)
Hadoop library to access GCS. It also uses Google's
[google-cloud-storage](https://mvnrepository.com/artifact/com.google.cloud/google-cloud-storage/2.68.0)
library to provide `RecoverableWriter` support.
This file system can be used with the [FileSystem connector]({{< ref
"docs/connectors/datastream/filesystem.md" >}}).
diff --git a/docs/content/docs/deployment/filesystems/gcs.md
b/docs/content/docs/deployment/filesystems/gcs.md
index 5a828d12b29..164dbcc2615 100644
--- a/docs/content/docs/deployment/filesystems/gcs.md
+++ b/docs/content/docs/deployment/filesystems/gcs.md
@@ -71,7 +71,7 @@ Note that these examples are *not* exhaustive and you can use
GCS in other place
Flink provides the `flink-gs-fs-hadoop` file system to write to GCS.
This implementation is self-contained with no dependency footprint, so there
is no need to add Hadoop to the classpath to use it.
-`flink-gs-fs-hadoop` registers a `FileSystem` wrapper for URIs with the
*gs://* scheme. It uses Google's
[gcs-connector](https://mvnrepository.com/artifact/com.google.cloud.bigdataoss/gcs-connector/hadoop3-2.2.18)
Hadoop library to access GCS. It also uses Google's
[google-cloud-storage](https://mvnrepository.com/artifact/com.google.cloud/google-cloud-storage/2.29.1)
library to provide `RecoverableWriter` support.
+`flink-gs-fs-hadoop` registers a `FileSystem` wrapper for URIs with the
*gs://* scheme. It uses Google's
[gcs-connector](https://mvnrepository.com/artifact/com.google.cloud.bigdataoss/gcs-connector/hadoop3-2.2.18)
Hadoop library to access GCS. It also uses Google's
[google-cloud-storage](https://mvnrepository.com/artifact/com.google.cloud/google-cloud-storage/2.68.0)
library to provide `RecoverableWriter` support.
This file system can be used with the [FileSystem connector]({{< ref
"docs/connectors/datastream/filesystem.md" >}}).
diff --git
a/flink-filesystems/flink-gs-fs-hadoop/archunit-violations/cce59259-d3cb-4048-a1bd-31197f847189
b/flink-filesystems/flink-gs-fs-hadoop/archunit-violations/cce59259-d3cb-4048-a1bd-31197f847189
index e69de29bb2d..f3c98676934 100644
---
a/flink-filesystems/flink-gs-fs-hadoop/archunit-violations/cce59259-d3cb-4048-a1bd-31197f847189
+++
b/flink-filesystems/flink-gs-fs-hadoop/archunit-violations/cce59259-d3cb-4048-a1bd-31197f847189
@@ -0,0 +1,12 @@
+org.apache.flink.fs.gs.GSFileSystemBehaviorITCase does not satisfy: only one
of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that
are static, final, and of type InternalMiniClusterExtension and annotated with
@RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any
fields that are static, final, and of type MiniClusterExtension and annotated
with @RegisterExtension or are , and of type MiniClusterTestEnvironment and
annotated with @TestEnv\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with
@ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with
@ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type
MiniClusterWithClientResource and final and annotated with @ClassRule or
contain any fields that is of type MiniClusterWithClientResource and public and
final and not static and annotated with @Rule
+org.apache.flink.fs.gs.GSRecoverableWriterITCase does not satisfy: only one of
the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that
are static, final, and of type InternalMiniClusterExtension and annotated with
@RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any
fields that are static, final, and of type MiniClusterExtension and annotated
with @RegisterExtension or are , and of type MiniClusterTestEnvironment and
annotated with @TestEnv\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with
@ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with
@ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type
MiniClusterWithClientResource and final and annotated with @ClassRule or
contain any fields that is of type MiniClusterWithClientResource and public and
final and not static and annotated with @Rule
diff --git a/flink-filesystems/flink-gs-fs-hadoop/pom.xml
b/flink-filesystems/flink-gs-fs-hadoop/pom.xml
index cfbafe3ace3..babc87e422a 100644
--- a/flink-filesystems/flink-gs-fs-hadoop/pom.xml
+++ b/flink-filesystems/flink-gs-fs-hadoop/pom.xml
@@ -34,11 +34,11 @@ under the License.
<properties>
<!-- If updating these dependency versions, please also update
the corresponding links -->
<!-- in the GCS file system documentation. -->
- <fs.gs.sdk.version>2.29.1</fs.gs.sdk.version>
+ <fs.gs.sdk.version>2.68.0</fs.gs.sdk.version>
<fs.gs.connector.version>hadoop3-2.2.18</fs.gs.connector.version>
<fs.gs.cloud.nio.version>0.128.7</fs.gs.cloud.nio.version>
<!-- Set this to the highest version of grpc artifacts from
gcs-connector and google-cloud-storage -->
- <fs.gs.grpc.version>1.59.1</fs.gs.grpc.version>
+ <fs.gs.grpc.version>1.81.0</fs.gs.grpc.version>
</properties>
<dependencies>
@@ -50,6 +50,15 @@ under the License.
<scope>provided</scope>
</dependency>
+ <!-- for the FileSystemBehaviorTestSuite used by
GSFileSystemBehaviorITCase -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+
<!-- ArchUnit test dependencies -->
<dependency>
@@ -103,6 +112,11 @@ under the License.
<groupId>com.google.errorprone</groupId>
<artifactId>error_prone_annotations</artifactId>
</exclusion>
+ <!-- static-analysis nullness annotations, not
needed at runtime -->
+ <exclusion>
+ <groupId>org.jspecify</groupId>
+ <artifactId>jspecify</artifactId>
+ </exclusion>
<!-- exclude dependency because of its GPLv2
license, see https://github.com/apache/flink/pull/15599#issuecomment-850241316
-->
<exclusion>
<groupId>javax.annotation</groupId>
diff --git
a/flink-filesystems/flink-gs-fs-hadoop/src/main/resources/META-INF/NOTICE
b/flink-filesystems/flink-gs-fs-hadoop/src/main/resources/META-INF/NOTICE
index 805f3d724b2..867ec9c5069 100644
--- a/flink-filesystems/flink-gs-fs-hadoop/src/main/resources/META-INF/NOTICE
+++ b/flink-filesystems/flink-gs-fs-hadoop/src/main/resources/META-INF/NOTICE
@@ -6,58 +6,67 @@ The Apache Software Foundation (http://www.apache.org/).
This project bundles the following dependencies under the Apache Software
License 2.0 (http://www.apache.org/licenses/LICENSE-2.0.txt)
+- com.fasterxml.jackson.core:jackson-annotations:2.20
- com.fasterxml.jackson.core:jackson-core:2.20.1
+- com.fasterxml.jackson.core:jackson-databind:2.20.1
+- com.fasterxml.jackson.dataformat:jackson-dataformat-xml:2.20.1
+- com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.20.1
+- com.fasterxml.woodstox:woodstox-core:7.0.0
- com.google.android:annotations:4.1.1.4
- com.google.api-client:google-api-client-jackson2:2.0.1
-- com.google.api-client:google-api-client:2.2.0
-- com.google.api.grpc:gapic-google-cloud-storage-v2:2.29.1-alpha
-- com.google.api.grpc:grpc-google-cloud-storage-v2:2.29.1-alpha
-- com.google.api.grpc:proto-google-cloud-monitoring-v3:1.64.0
-- com.google.api.grpc:proto-google-cloud-storage-v2:2.29.1-alpha
-- com.google.api.grpc:proto-google-common-protos:2.28.0
-- com.google.api.grpc:proto-google-iam-v1:1.23.0
+- com.google.api-client:google-api-client:2.7.2
+- com.google.api.grpc:gapic-google-cloud-storage-v2:2.68.0
+- com.google.api.grpc:grpc-google-cloud-storage-v2:2.68.0
+- com.google.api.grpc:proto-google-cloud-monitoring-v3:3.52.0
+- com.google.api.grpc:proto-google-cloud-storage-v2:2.68.0
+- com.google.api.grpc:proto-google-common-protos:2.71.0
+- com.google.api.grpc:proto-google-iam-v1:1.66.0
- com.google.apis:google-api-services-iamcredentials:v1-rev20211203-2.0.0
-- com.google.apis:google-api-services-storage:v1-rev20231028-2.0.0
-- com.google.auto.value:auto-value-annotations:1.10.4
+- com.google.apis:google-api-services-storage:v1-rev20260204-2.0.0
+- com.google.auto.value:auto-value-annotations:1.11.0
- com.google.cloud.bigdataoss:gcs-connector:hadoop3-2.2.18
- com.google.cloud.bigdataoss:gcsio:2.2.18
- com.google.cloud.bigdataoss:util-hadoop:hadoop3-2.2.18
- com.google.cloud.bigdataoss:util:2.2.18
-- com.google.cloud:google-cloud-core-grpc:2.27.0
-- com.google.cloud:google-cloud-core-http:2.27.0
-- com.google.cloud:google-cloud-core:2.27.0
-- com.google.cloud:google-cloud-monitoring:1.82.0
-- com.google.cloud:google-cloud-storage:2.29.1
-- com.google.code.gson:gson:2.10.1
+- com.google.cloud.opentelemetry:detector-resources-support:0.33.0
+- com.google.cloud.opentelemetry:exporter-metrics:0.33.0
+- com.google.cloud.opentelemetry:shared-resourcemapping:0.33.0
+- com.google.cloud:google-cloud-core-grpc:2.70.0
+- com.google.cloud:google-cloud-core-http:2.70.0
+- com.google.cloud:google-cloud-core:2.70.0
+- com.google.cloud:google-cloud-monitoring:3.52.0
+- com.google.cloud:google-cloud-storage:2.68.0
+- com.google.code.gson:gson:2.13.2
- com.google.flogger:flogger-system-backend:0.7.1
- com.google.flogger:flogger:0.7.1
- com.google.flogger:google-extensions:0.7.1
- com.google.guava:listenablefuture:9999.0-empty-to-avoid-conflict-with-guava
-- com.google.http-client:google-http-client-apache-v2:1.43.3
-- com.google.http-client:google-http-client-appengine:1.43.3
-- com.google.http-client:google-http-client-gson:1.43.3
-- com.google.http-client:google-http-client-jackson2:1.43.3
-- com.google.http-client:google-http-client:1.43.3
-- com.google.oauth-client:google-oauth-client:1.34.1
+- com.google.http-client:google-http-client-apache-v2:2.1.0
+- com.google.http-client:google-http-client-appengine:2.1.0
+- com.google.http-client:google-http-client-gson:2.1.0
+- com.google.http-client:google-http-client-jackson2:2.1.0
+- com.google.http-client:google-http-client:2.1.0
+- com.google.oauth-client:google-oauth-client:1.39.0
- com.lmax:disruptor:3.4.2
- commons-codec:commons-codec:1.15
-- io.grpc:grpc-alts:1.59.1
-- io.grpc:grpc-api:1.59.1
-- io.grpc:grpc-auth:1.59.1
-- io.grpc:grpc-census:1.59.1
-- io.grpc:grpc-context:1.59.1
-- io.grpc:grpc-core:1.59.1
-- io.grpc:grpc-googleapis:1.59.1
-- io.grpc:grpc-grpclb:1.59.1
-- io.grpc:grpc-inprocess:1.59.1
-- io.grpc:grpc-netty-shaded:1.59.1
-- io.grpc:grpc-protobuf-lite:1.59.1
-- io.grpc:grpc-protobuf:1.59.1
-- io.grpc:grpc-rls:1.59.1
-- io.grpc:grpc-services:1.59.1
-- io.grpc:grpc-stub:1.59.1
-- io.grpc:grpc-util:1.59.1
-- io.grpc:grpc-xds:1.59.1
+- io.grpc:grpc-alts:1.81.0
+- io.grpc:grpc-api:1.81.0
+- io.grpc:grpc-auth:1.81.0
+- io.grpc:grpc-census:1.81.0
+- io.grpc:grpc-context:1.81.0
+- io.grpc:grpc-core:1.81.0
+- io.grpc:grpc-googleapis:1.81.0
+- io.grpc:grpc-grpclb:1.81.0
+- io.grpc:grpc-inprocess:1.81.0
+- io.grpc:grpc-netty-shaded:1.81.0
+- io.grpc:grpc-opentelemetry:1.81.0
+- io.grpc:grpc-protobuf-lite:1.81.0
+- io.grpc:grpc-protobuf:1.81.0
+- io.grpc:grpc-rls:1.81.0
+- io.grpc:grpc-services:1.81.0
+- io.grpc:grpc-stub:1.81.0
+- io.grpc:grpc-util:1.81.0
+- io.grpc:grpc-xds:1.81.0
- io.opencensus:opencensus-api:0.31.1
- io.opencensus:opencensus-contrib-exemplar-util:0.31.0
- io.opencensus:opencensus-contrib-grpc-metrics:0.31.0
@@ -65,10 +74,20 @@ This project bundles the following dependencies under the
Apache Software Licens
- io.opencensus:opencensus-contrib-resource-util:0.31.0
- io.opencensus:opencensus-exporter-metrics-util:0.31.0
- io.opencensus:opencensus-exporter-stats-stackdriver:0.31.0
-- io.opencensus:opencensus-impl:0.31.0
- io.opencensus:opencensus-impl-core:0.31.0
-- io.opencensus:opencensus-proto:0.2.0
-- io.perfmark:perfmark-api:0.26.0
+- io.opencensus:opencensus-impl:0.31.0
+- io.opentelemetry.contrib:opentelemetry-gcp-resources:1.37.0-alpha
+- io.opentelemetry.semconv:opentelemetry-semconv:1.29.0-alpha
+- io.opentelemetry:opentelemetry-api:1.57.0
+- io.opentelemetry:opentelemetry-common:1.57.0
+- io.opentelemetry:opentelemetry-context:1.57.0
+- io.opentelemetry:opentelemetry-sdk-common:1.57.0
+- io.opentelemetry:opentelemetry-sdk-extension-autoconfigure-spi:1.57.0
+- io.opentelemetry:opentelemetry-sdk-logs:1.57.0
+- io.opentelemetry:opentelemetry-sdk-metrics:1.57.0
+- io.opentelemetry:opentelemetry-sdk-trace:1.57.0
+- io.opentelemetry:opentelemetry-sdk:1.57.0
+- io.perfmark:perfmark-api:0.27.0
- org.apache.httpcomponents:httpclient:4.5.13
- org.apache.httpcomponents:httpcore:4.4.14
- org.conscrypt:conscrypt-openjdk-uber:2.5.2
@@ -76,21 +95,27 @@ This project bundles the following dependencies under the
Apache Software Licens
This project bundles the following dependencies under BSD-3 License
(https://opensource.org/licenses/BSD-3-Clause).
See bundled license files for details.
-- com.google.api:api-common:2.20.0
-- com.google.api:gax-grpc:2.37.0
-- com.google.api:gax-httpjson:2.37.0
-- com.google.api:gax:2.37.0
-- com.google.auth:google-auth-library-credentials:1.20.0
-- com.google.auth:google-auth-library-oauth2-http:1.20.0
-- com.google.protobuf:protobuf-java-util:3.24.4
-- com.google.protobuf:protobuf-java:3.24.4
-- org.threeten:threetenbp:1.6.8
+- com.google.api:api-common:2.63.0
+- com.google.api:gax-grpc:2.80.0
+- com.google.api:gax-httpjson:2.80.0
+- com.google.api:gax:2.80.0
+- com.google.auth:google-auth-library-credentials:1.47.0
+- com.google.auth:google-auth-library-oauth2-http:1.47.0
+- com.google.protobuf:protobuf-java-util:4.33.2
+- com.google.protobuf:protobuf-java:4.33.2
+- org.threeten:threetenbp:1.7.0
+
+This project bundles the following dependencies under BSD License
(https://opensource.org/licenses/bsd-license.php).
+See bundled license files for details.
+
+- org.codehaus.woodstox:stax2-api:4.2.2
(https://github.com/FasterXML/stax2-api/tree/stax2-api-4.2.2)
This project bundles the following dependencies under the Go License
(https://golang.org/LICENSE).
See bundled license files for details.
-- com.google.re2j:re2j:1.7
+
+- com.google.re2j:re2j:1.8
This project bundles the following dependencies under the MIT License.
See bundled license files for details.
-- org.codehaus.mojo:animal-sniffer-annotations:1.23
+- org.codehaus.mojo:animal-sniffer-annotations:1.27
diff --git
a/flink-filesystems/flink-gs-fs-hadoop/src/main/resources/META-INF/licenses/LICENSE.stax2-api
b/flink-filesystems/flink-gs-fs-hadoop/src/main/resources/META-INF/licenses/LICENSE.stax2-api
new file mode 100644
index 00000000000..0ed63616996
--- /dev/null
+++
b/flink-filesystems/flink-gs-fs-hadoop/src/main/resources/META-INF/licenses/LICENSE.stax2-api
@@ -0,0 +1,22 @@
+Copyright woodstox stax2api contributors.
+
+Redistribution and use in source and binary forms, with or without
modification,
+are permitted provided that the following conditions are met:
+
+1. Redistributions of source code must retain the above copyright notice,
+ this list of conditions and the following disclaimer.
+
+2. Redistributions in binary form must reproduce the above copyright notice,
+ this list of conditions and the following disclaimer in the documentation
+ and/or other materials provided with the distribution.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED.
+IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY
DIRECT,
+INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA,
+OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
LIABILITY,
+WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
OTHERWISE)
+ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+POSSIBILITY OF SUCH DAMAGE.
diff --git
a/flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/GSFileSystemBehaviorITCase.java
b/flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/GSFileSystemBehaviorITCase.java
new file mode 100644
index 00000000000..68cb21c759e
--- /dev/null
+++
b/flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/GSFileSystemBehaviorITCase.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemBehaviorTestSuite;
+import org.apache.flink.core.fs.FileSystemKind;
+import org.apache.flink.core.fs.Path;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+
+import java.io.IOException;
+import java.util.UUID;
+
+/**
+ * An implementation of the {@link FileSystemBehaviorTestSuite} for the Google
Cloud Storage file
+ * system.
+ *
+ * <p>Runs against a real GCS bucket and is skipped unless one is configured;
see {@link
+ * GSTestCredentials}.
+ */
+class GSFileSystemBehaviorITCase extends FileSystemBehaviorTestSuite {
+
+ private static final String TEST_DATA_DIR = "tests-" + UUID.randomUUID();
+
+ @BeforeAll
+ static void checkCredentialsAndSetup() {
+ GSTestCredentials.assumeCredentialsAvailable();
+ FileSystem.initialize(new Configuration(), null);
+ }
+
+ @AfterAll
+ static void clearFsConfig() throws IOException {
+ FileSystem.initialize(new Configuration(), null);
+ }
+
+ @Override
+ protected FileSystem getFileSystem() throws Exception {
+ return getBasePath().getFileSystem();
+ }
+
+ @Override
+ protected Path getBasePath() throws Exception {
+ return new Path(GSTestCredentials.getTestBucketUri() + TEST_DATA_DIR);
+ }
+
+ @Override
+ protected FileSystemKind getFileSystemKind() {
+ return FileSystemKind.OBJECT_STORE;
+ }
+}
diff --git
a/flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/GSRecoverableWriterITCase.java
b/flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/GSRecoverableWriterITCase.java
new file mode 100644
index 00000000000..fd25ee48b56
--- /dev/null
+++
b/flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/GSRecoverableWriterITCase.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.util.Random;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Integration tests for the Google Cloud Storage {@link RecoverableWriter}
against a real GCS
+ * bucket. Exercises the write / checkpoint ({@code persist}) / recover /
commit flow that backs
+ * exactly-once {@code FileSink} checkpointing on GCS.
+ *
+ * <p>Skipped unless a GCS bucket is configured; see {@link GSTestCredentials}.
+ */
+class GSRecoverableWriterITCase {
+
+ private static final String TEST_DATA_DIR = "tests-" + UUID.randomUUID();
+
+ private static FileSystem fileSystem;
+ private static Path basePath;
+
+ private Path targetFile;
+
+ @BeforeAll
+ static void checkCredentialsAndSetup() throws Exception {
+ GSTestCredentials.assumeCredentialsAvailable();
+ FileSystem.initialize(new Configuration(), null);
+ basePath = new Path(GSTestCredentials.getTestBucketUri() +
TEST_DATA_DIR);
+ fileSystem = basePath.getFileSystem();
+ }
+
+ @AfterAll
+ static void teardown() throws Exception {
+ if (fileSystem != null) {
+ fileSystem.delete(basePath, true);
+ }
+ FileSystem.initialize(new Configuration(), null);
+ }
+
+ @BeforeEach
+ void beforeEach() {
+ targetFile = new Path(basePath, UUID.randomUUID().toString());
+ }
+
+ @AfterEach
+ void afterEach() throws Exception {
+ fileSystem.delete(targetFile, false);
+ }
+
+ @Test
+ void testWriteAndCommit() throws Exception {
+ final byte[] data = randomData(4 * 1024 * 1024);
+
+ final RecoverableWriter writer = fileSystem.createRecoverableWriter();
+ final RecoverableFsDataOutputStream stream = writer.open(targetFile);
+ stream.write(data);
+ stream.closeForCommit().commit();
+
+ assertThat(readFully(targetFile)).isEqualTo(data);
+ }
+
+ @Test
+ void testPersistRecoverAndCommit() throws Exception {
+ final byte[] data = randomData(5 * 1024 * 1024);
+ final int checkpointOffset = 2 * 1024 * 1024;
+
+ final RecoverableWriter writer = fileSystem.createRecoverableWriter();
+
+ // write up to the checkpoint offset, then take a checkpoint
+ final RecoverableFsDataOutputStream stream = writer.open(targetFile);
+ stream.write(data, 0, checkpointOffset);
+ final RecoverableWriter.ResumeRecoverable checkpoint =
stream.persist();
+
+ // simulate a failure: abandon the original stream and recover from
the checkpoint
+ final RecoverableFsDataOutputStream recovered =
writer.recover(checkpoint);
+ recovered.write(data, checkpointOffset, data.length -
checkpointOffset);
+ recovered.closeForCommit().commit();
+
+ // the committed object must contain exactly the bytes written across
the recovery
+ assertThat(readFully(targetFile)).isEqualTo(data);
+ }
+
+ private static byte[] randomData(int size) {
+ final byte[] data = new byte[size];
+ new Random(42).nextBytes(data);
+ return data;
+ }
+
+ private static byte[] readFully(Path path) throws Exception {
+ try (FSDataInputStream in = fileSystem.open(path);
+ ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+ final byte[] buffer = new byte[64 * 1024];
+ int read;
+ while ((read = in.read(buffer)) > 0) {
+ out.write(buffer, 0, read);
+ }
+ return out.toByteArray();
+ }
+ }
+}
diff --git
a/flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/GSTestCredentials.java
b/flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/GSTestCredentials.java
new file mode 100644
index 00000000000..0b26f2ccef9
--- /dev/null
+++
b/flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/GSTestCredentials.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs;
+
+import javax.annotation.Nullable;
+
+import static org.assertj.core.api.Assumptions.assumeThat;
+
+/**
+ * Access to the GCS bucket used by the integration tests in this module.
+ *
+ * <p>The integration tests are skipped unless the {@code IT_CASE_GCS_BUCKET}
environment variable
+ * names a writable bucket. Authentication uses Application Default
Credentials, so the {@code
+ * GOOGLE_APPLICATION_CREDENTIALS} environment variable should point to a
service-account key file
+ * (the method recommended by the GCS filesystem documentation), which feeds
both the gcs-connector
+ * and the google-cloud-storage library used by {@code RecoverableWriter}.
+ */
+class GSTestCredentials {
+
+ @Nullable private static final String GCS_TEST_BUCKET =
System.getenv("IT_CASE_GCS_BUCKET");
+
+ /** Skips the calling test unless a GCS test bucket is configured. */
+ static void assumeCredentialsAvailable() {
+ assumeThat(GCS_TEST_BUCKET)
+ .as("No GCS test bucket configured via the IT_CASE_GCS_BUCKET
environment variable")
+ .isNotBlank();
+ }
+
+ /** Returns the URI of the test bucket, e.g. {@code gs://my-bucket/temp/}.
*/
+ static String getTestBucketUri() {
+ if (GCS_TEST_BUCKET == null) {
+ throw new IllegalStateException(
+ "GCS test bucket not available (IT_CASE_GCS_BUCKET not
set)");
+ }
+ return "gs://" + GCS_TEST_BUCKET + "/temp/";
+ }
+}