This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new fb0beb0a3ff NIFI-15754 Add Google Cloud Storage Provider for Iceberg
(#11052)
fb0beb0a3ff is described below
commit fb0beb0a3ff4ccc0adabd0a601e5f47e1095d063
Author: David Handermann <[email protected]>
AuthorDate: Mon Mar 30 16:02:53 2026 -0500
NIFI-15754 Add Google Cloud Storage Provider for Iceberg (#11052)
* NIFI-15754 Added Google Cloud Storage Provider for Iceberg
- Added Iceberg GCS module and NAR
- Added Iceberg FileIO implementation using Java HttpClient for GCS REST
operations
- Added retry strategy to HttpClientProvider
---
nifi-assembly/pom.xml | 6 +
.../nifi-iceberg-gcs-nar/pom.xml | 74 +++++
.../src/main/resources/META-INF/LICENSE | 209 +++++++++++++
.../nifi-iceberg-bundle/nifi-iceberg-gcs/pom.xml | 55 ++++
.../iceberg/gcs/AuthenticationStrategy.java | 50 +++
.../iceberg/gcs/GCSIcebergFileIOProvider.java | 62 ++++
.../iceberg/gcs/GoogleCloudStorageFileIO.java | 145 +++++++++
.../iceberg/gcs/GoogleCloudStorageHeader.java | 56 ++++
.../iceberg/gcs/GoogleCloudStorageInputFile.java | 169 ++++++++++
.../iceberg/gcs/GoogleCloudStorageLocation.java | 81 +++++
.../iceberg/gcs/GoogleCloudStorageOutputFile.java | 98 ++++++
.../GoogleCloudStoragePositionOutputStream.java | 225 ++++++++++++++
.../iceberg/gcs/GoogleCloudStorageProperties.java | 140 +++++++++
.../iceberg/gcs/GoogleCloudStorageProperty.java | 47 +++
.../gcs/GoogleCloudStorageSeekableInputStream.java | 284 +++++++++++++++++
.../services/iceberg/gcs/HttpClientProvider.java | 154 ++++++++++
.../services/iceberg/gcs/HttpRequestException.java | 27 ++
.../iceberg/gcs/HttpResponseException.java | 27 ++
.../services/iceberg/gcs/KeyDigestProvider.java | 43 +++
.../org.apache.nifi.controller.ControllerService | 15 +
.../iceberg/gcs/GCSIcebergFileIOProviderTest.java | 89 ++++++
.../iceberg/gcs/GoogleCloudStorageFileIOTest.java | 341 +++++++++++++++++++++
.../gcs/GoogleCloudStorageLocationTest.java | 109 +++++++
.../iceberg/gcs/HttpClientProviderTest.java | 141 +++++++++
.../iceberg/gcs/KeyDigestProviderTest.java | 34 ++
nifi-extension-bundles/nifi-iceberg-bundle/pom.xml | 2 +
26 files changed, 2683 insertions(+)
diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml
index def628fa54e..eb0d6e21918 100644
--- a/nifi-assembly/pom.xml
+++ b/nifi-assembly/pom.xml
@@ -885,6 +885,12 @@ language governing permissions and limitations under the
License. -->
<version>2.9.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-iceberg-gcs-nar</artifactId>
+ <version>2.9.0-SNAPSHOT</version>
+ <type>nar</type>
+ </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-iceberg-parquet-writer-nar</artifactId>
diff --git
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs-nar/pom.xml
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs-nar/pom.xml
new file mode 100644
index 00000000000..2de9b861b8a
--- /dev/null
+++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs-nar/pom.xml
@@ -0,0 +1,74 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
https://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-iceberg-bundle</artifactId>
+ <version>2.9.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>nifi-iceberg-gcs-nar</artifactId>
+ <packaging>nar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-iceberg-gcs</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-iceberg-service-api-nar</artifactId>
+ <version>${project.version}</version>
+ <type>nar</type>
+ </dependency>
+ </dependencies>
+
+ <dependencyManagement>
+ <dependencies>
+ <!-- Provided in shared NAR -->
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-api</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-core</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-common</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-bundled-guava</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+</project>
diff --git
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs-nar/src/main/resources/META-INF/LICENSE
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs-nar/src/main/resources/META-INF/LICENSE
new file mode 100644
index 00000000000..44893cdb29d
--- /dev/null
+++
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs-nar/src/main/resources/META-INF/LICENSE
@@ -0,0 +1,209 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+APACHE NIFI SUBCOMPONENTS:
+
+The Apache NiFi project contains subcomponents with separate copyright
+notices and license terms. Your use of the source code for the these
+subcomponents is subject to the terms and conditions of the following
+licenses.
diff --git
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/pom.xml
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/pom.xml
new file mode 100644
index 00000000000..d830b801552
--- /dev/null
+++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/pom.xml
@@ -0,0 +1,55 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-iceberg-bundle</artifactId>
+ <version>2.9.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>nifi-iceberg-gcs</artifactId>
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-iceberg-service-api</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <!-- Provided in shared NAR -->
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-api</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <!-- Test Dependencies -->
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-mock</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.squareup.okhttp3</groupId>
+ <artifactId>mockwebserver3</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/AuthenticationStrategy.java
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/AuthenticationStrategy.java
new file mode 100644
index 00000000000..97b8e936dd7
--- /dev/null
+++
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/AuthenticationStrategy.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg.gcs;
+
+import org.apache.nifi.components.DescribedValue;
+
+/**
+ * Enumeration of supported Authentication Types for Google Cloud Storage
access
+ */
+public enum AuthenticationStrategy implements DescribedValue {
+ VENDED_CREDENTIALS("Vended Credentials", "Authentication using credentials
supplied from Iceberg Catalog");
+
+ private final String displayName;
+
+ private final String description;
+
+ AuthenticationStrategy(final String displayName, final String description)
{
+ this.displayName = displayName;
+ this.description = description;
+ }
+
+ @Override
+ public String getValue() {
+ return name();
+ }
+
+ @Override
+ public String getDisplayName() {
+ return displayName;
+ }
+
+ @Override
+ public String getDescription() {
+ return description;
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/GCSIcebergFileIOProvider.java
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/GCSIcebergFileIOProvider.java
new file mode 100644
index 00000000000..7a5c5a648b6
--- /dev/null
+++
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/GCSIcebergFileIOProvider.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg.gcs;
+
+import org.apache.iceberg.io.FileIO;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.services.iceberg.IcebergFileIOProvider;
+import org.apache.nifi.services.iceberg.ProviderContext;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+@Tags({"google", "cloud", "storage", "iceberg", "gcp"})
+@CapabilityDescription("Provides Google Cloud Storage file input and output
support for Apache Iceberg tables")
+public class GCSIcebergFileIOProvider extends AbstractControllerService
implements IcebergFileIOProvider {
+
+ static final PropertyDescriptor AUTHENTICATION_STRATEGY = new
PropertyDescriptor.Builder()
+ .name("Authentication Strategy")
+ .description("Strategy for authenticating with Google Cloud
Storage services")
+ .required(true)
+ .allowableValues(AuthenticationStrategy.class)
+ .defaultValue(AuthenticationStrategy.VENDED_CREDENTIALS)
+ .build();
+
+ private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS =
List.of(
+ AUTHENTICATION_STRATEGY
+ );
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return PROPERTY_DESCRIPTORS;
+ }
+
+ @Override
+ public FileIO getFileIO(final ProviderContext providerContext) {
+ Objects.requireNonNull(providerContext, "Provider Context required");
+ final Map<String, String> contextProperties =
providerContext.getProperties();
+ Objects.requireNonNull(contextProperties, "Context properties
required");
+
+ final GoogleCloudStorageFileIO fileIO = new GoogleCloudStorageFileIO();
+ fileIO.initialize(contextProperties);
+ return fileIO;
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/GoogleCloudStorageFileIO.java
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/GoogleCloudStorageFileIO.java
new file mode 100644
index 00000000000..1771b021a24
--- /dev/null
+++
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/GoogleCloudStorageFileIO.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg.gcs;
+
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serial;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.util.Collections;
+import java.util.Map;
+
+import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
+import static java.net.HttpURLConnection.HTTP_NO_CONTENT;
+import static
org.apache.nifi.services.iceberg.gcs.GoogleCloudStorageProperty.OAUTH2_TOKEN;
+
+/**
+ * Google Cloud Storage implementation of Iceberg FileIO using Java HttpClient
for REST API operations
+ */
+class GoogleCloudStorageFileIO implements FileIO {
+
+ @Serial
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger logger =
LoggerFactory.getLogger(GoogleCloudStorageFileIO.class);
+
+ private Map<String, String> properties = Collections.emptyMap();
+
+ private transient GoogleCloudStorageProperties storageProperties;
+ private transient HttpClientProvider httpClientProvider;
+
+ /**
+ * Initialize FileIO with standard properties defined in Apache Iceberg
GCPProperties class
+ *
+ * @param properties Properties defined according to Iceberg GCPProperties
+ */
+ @Override
+ public void initialize(final Map<String, String> properties) {
+ this.properties = Map.copyOf(properties);
+ this.storageProperties = new GoogleCloudStorageProperties(properties);
+
+ final String bearerToken = properties.get(OAUTH2_TOKEN.getProperty());
+ this.httpClientProvider = new HttpClientProvider(bearerToken);
+ }
+
+ /**
+ * Create Iceberg Input File with unspecified length
+ *
+ * @param path Input File Path
+ * @return Input File with unspecified length
+ */
+ @Override
+ public InputFile newInputFile(final String path) {
+ return new GoogleCloudStorageInputFile(httpClientProvider,
storageProperties, GoogleCloudStorageLocation.parse(path), path, null);
+ }
+
+ /**
+ * Create Iceberg Input File with length specified
+ *
+ * @param path Input File Path
+ * @param length Input File Length in bytes
+ * @return Input File with length specified
+ */
+ @Override
+ public InputFile newInputFile(final String path, final long length) {
+ return new GoogleCloudStorageInputFile(httpClientProvider,
storageProperties, GoogleCloudStorageLocation.parse(path), path, length);
+ }
+
+ /**
+ * Create Iceberg Output File
+ *
+ * @param path Output File Path
+ * @return Output File
+ */
+ @Override
+ public OutputFile newOutputFile(final String path) {
+ return new GoogleCloudStorageOutputFile(httpClientProvider,
storageProperties, GoogleCloudStorageLocation.parse(path), path);
+ }
+
+ /**
+ * Delete File at specified location
+ *
+ * @param path Location of file to be deleted
+ */
+ @Override
+ public void deleteFile(final String path) {
+ final GoogleCloudStorageLocation location =
GoogleCloudStorageLocation.parse(path);
+ final String uri = storageProperties.metadataUri(location);
+ final HttpRequest request =
httpClientProvider.newRequestBuilder(uri).DELETE().build();
+ try {
+ final HttpResponse<String> response =
httpClientProvider.send(request, HttpResponse.BodyHandlers.ofString());
+ final int statusCode = response.statusCode();
+
+ if (HTTP_NO_CONTENT == statusCode || HTTP_NOT_FOUND == statusCode)
{
+ logger.debug("Delete File [{}] completed: HTTP {}", path,
statusCode);
+ } else {
+ final String responseBody = response.body();
+ throw new HttpResponseException("Delete File [%s] failed: HTTP
%d [%s]".formatted(path, statusCode, responseBody));
+ }
+ } catch (final IOException e) {
+ throw new HttpRequestException("Delete File [%s]
failed".formatted(path), e);
+ }
+ }
+
+ /**
+ * Get current configuration properties
+ *
+ * @return Configuration properties
+ */
+ @Override
+ public Map<String, String> properties() {
+ return properties;
+ }
+
+ /**
+ * Close client resources
+ */
+ @Override
+ public void close() {
+ if (httpClientProvider == null) {
+ logger.warn("Closed before initialized");
+ } else {
+ httpClientProvider.close();
+ }
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/GoogleCloudStorageHeader.java
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/GoogleCloudStorageHeader.java
new file mode 100644
index 00000000000..a6ab5f53fff
--- /dev/null
+++
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/GoogleCloudStorageHeader.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg.gcs;
+
+/**
+ * HTTP header names used in Google Cloud Storage JSON API requests and
responses
+ */
+enum GoogleCloudStorageHeader {
+ /** RFC 9110 Section 11.6.2 */
+ AUTHORIZATION("Authorization"),
+
+ /** RFC 9110 Section 14.4 */
+ CONTENT_RANGE("Content-Range"),
+
+ /** Google Header for customer-supplied encryption keys */
+ ENCRYPTION_ALGORITHM("x-goog-encryption-algorithm"),
+
+ /** Google Header for customer-supplied encryption keys containing
Base64-encoded values */
+ ENCRYPTION_KEY("x-goog-encryption-key"),
+
+ /** Google Header for customer-supplied encryption keys containing
Base64-encoded values */
+ ENCRYPTION_KEY_SHA256("x-goog-encryption-key-sha256"),
+
+ /** RFC 9110 Section 10.2.2 */
+ LOCATION("Location"),
+
+ /** RFC 9110 Section 14.2 */
+ RANGE("Range"),
+
+ /** Content Type for payload of Google Cloud Storage resumable uploads */
+ UPLOAD_CONTENT_TYPE("X-Upload-Content-Type");
+
+ private final String header;
+
+ GoogleCloudStorageHeader(final String header) {
+ this.header = header;
+ }
+
+ String getHeader() {
+ return header;
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/GoogleCloudStorageInputFile.java
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/GoogleCloudStorageInputFile.java
new file mode 100644
index 00000000000..1641c8e601f
--- /dev/null
+++
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/GoogleCloudStorageInputFile.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg.gcs;
+
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.SeekableInputStream;
+
+import java.io.IOException;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
+import static java.net.HttpURLConnection.HTTP_OK;
+
+/**
+ * Google Cloud Storage implementation of Apache Iceberg InputFile using
HttpClient for REST operations
+ */
+class GoogleCloudStorageInputFile implements InputFile {
+
+ private static final String QUESTION_MARK = "?";
+ private static final char AMPERSAND = '&';
+ private static final String FIELDS_SIZE_QUERY = "fields=size";
+
+ private static final Pattern SIZE_PATTERN =
Pattern.compile("\"size\"\\s*:\\s*\"(\\d+)\"");
+ private static final int SIZE_GROUP = 1;
+
+ private final HttpClientProvider httpClientProvider;
+
+ private final GoogleCloudStorageProperties storageProperties;
+
+ private final GoogleCloudStorageLocation location;
+
+ private final String path;
+
+ private volatile Long cachedLength;
+
+ GoogleCloudStorageInputFile(
+ final HttpClientProvider httpClientProvider,
+ final GoogleCloudStorageProperties storageProperties,
+ final GoogleCloudStorageLocation location,
+ final String path,
+ final Long cachedLength
+ ) {
+ this.httpClientProvider = httpClientProvider;
+ this.storageProperties = storageProperties;
+ this.location = location;
+ this.path = path;
+ this.cachedLength = cachedLength;
+ }
+
+ /**
+ * Get Input File length in bytes and fetch remote object size when length
is not cached
+ *
+ * @return Input File length in bytes
+ */
+ @Override
+ public long getLength() {
+ if (cachedLength == null) {
+ cachedLength = getObjectSize();
+ }
+ return cachedLength;
+ }
+
+ /**
+ * Create Seekable InputStream for InputFile reading
+ *
+ * @return Seekable InputStream
+ */
+ @Override
+ public SeekableInputStream newStream() {
+ return new GoogleCloudStorageSeekableInputStream(httpClientProvider,
storageProperties, location, cachedLength);
+ }
+
+ /**
+ * Get InputFile Location
+ *
+ * @return InputFile Location
+ */
+ @Override
+ public String location() {
+ return path;
+ }
+
+ /**
+ * Get status of InputFile existence based on Metadata URI
+ *
+ * @return InputFile existence status
+ */
+ @Override
+ public boolean exists() {
+ final String uri = getMetadataUri();
+ final HttpRequest request =
httpClientProvider.newRequestBuilder(uri).GET().build();
+
+ try {
+ final HttpResponse<Void> response =
httpClientProvider.send(request, HttpResponse.BodyHandlers.discarding());
+ final int statusCode = response.statusCode();
+
+ final boolean exists;
+ if (statusCode == HTTP_OK) {
+ exists = true;
+ } else if (statusCode == HTTP_NOT_FOUND) {
+ exists = false;
+ } else {
+ throw new HttpResponseException("Metadata URI [%s] request
failed: HTTP %d".formatted(uri, statusCode));
+ }
+
+ return exists;
+ } catch (final IOException e) {
+ throw new HttpRequestException("Metadata URI [%s] request
failed".formatted(uri), e);
+ }
+ }
+
+ private long getObjectSize() {
+ final String uri = getMetadataUri();
+ final HttpRequest request =
httpClientProvider.newRequestBuilder(uri).GET().build();
+
+ try {
+ final HttpResponse<String> response =
httpClientProvider.send(request, HttpResponse.BodyHandlers.ofString());
+ final int statusCode = response.statusCode();
+ final String responseBody = response.body();
+
+ if (HTTP_OK == statusCode) {
+ final Matcher sizeMatcher = SIZE_PATTERN.matcher(responseBody);
+ if (sizeMatcher.find()) {
+ final String sizeGroup = sizeMatcher.group(SIZE_GROUP);
+ return Long.parseLong(sizeGroup);
+ } else {
+ throw new HttpResponseException("Metadata URI [%s]
response parsing failed: HTTP %d size not found [%s]".formatted(uri,
statusCode, responseBody));
+ }
+ } else {
+ throw new HttpResponseException("Metadata URI [%s] request
failed: HTTP %d [%s]".formatted(uri, statusCode, responseBody));
+ }
+ } catch (final IOException e) {
+ throw new HttpRequestException("Metadata URI [%s] request
failed".formatted(uri), e);
+ }
+ }
+
+ private String getMetadataUri() {
+ final String baseUri = storageProperties.metadataUri(location);
+
+ final StringBuilder builder = new StringBuilder(baseUri);
+
+ if (baseUri.contains(QUESTION_MARK)) {
+ builder.append(AMPERSAND);
+ } else {
+ builder.append(QUESTION_MARK);
+ }
+
+ builder.append(FIELDS_SIZE_QUERY);
+
+ return builder.toString();
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/GoogleCloudStorageLocation.java
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/GoogleCloudStorageLocation.java
new file mode 100644
index 00000000000..2d617a4b6bd
--- /dev/null
+++
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/GoogleCloudStorageLocation.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg.gcs;
+
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * Parsed representation of Google Cloud Storage URI
+ */
+record GoogleCloudStorageLocation(
+ String bucket,
+ String objectKey,
+ String encodedObjectKey
+) {
+
+ private static final String GS_SCHEME = "gs://";
+
+ private static final char FORWARD_SLASH = '/';
+
+ private static final String PLUS = "+";
+
+ private static final String SPACE_ENCODED = "%20";
+
+ private static final String URI_FORMAT = "gs://%s/%s";
+
+ /**
+ * Parse URI to Google Cloud Storage Location
+ *
+ * @param uri Google Cloud Storage URI
+ * @return Parsed Location
+ */
+ static GoogleCloudStorageLocation parse(final String uri) {
+ if (uri == null) {
+ throw new IllegalArgumentException("URI required");
+ }
+
+ if (uri.startsWith(GS_SCHEME)) {
+ final String path = uri.substring(GS_SCHEME.length());
+ final int forwardSlashIndex = path.indexOf(FORWARD_SLASH);
+ if (forwardSlashIndex < 1) {
+ throw new IllegalArgumentException("URI [%s] missing bucket
and object objectKey".formatted(uri));
+ }
+
+ final String bucket = path.substring(0, forwardSlashIndex);
+ final String objectKey = path.substring(forwardSlashIndex + 1);
+ if (objectKey.isEmpty()) {
+ throw new IllegalArgumentException("URI [%s] empty object
objectKey".formatted(uri));
+ }
+
+ final String encodedObjectKey = getEncodedObjectKey(objectKey);
+ return new GoogleCloudStorageLocation(bucket, objectKey,
encodedObjectKey);
+ } else {
+ throw new IllegalArgumentException("URI [%s] missing required
scheme [gs]".formatted(uri));
+ }
+ }
+
+ private static String getEncodedObjectKey(final String objectKey) {
+ final String urlEncodedObjectKey = URLEncoder.encode(objectKey,
StandardCharsets.UTF_8);
+ return urlEncodedObjectKey.replace(PLUS, SPACE_ENCODED);
+ }
+
+ @Override
+ public String toString() {
+ return URI_FORMAT.formatted(bucket, objectKey);
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/GoogleCloudStorageOutputFile.java
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/GoogleCloudStorageOutputFile.java
new file mode 100644
index 00000000000..0127999f47a
--- /dev/null
+++
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/GoogleCloudStorageOutputFile.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg.gcs;
+
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.PositionOutputStream;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+
+/**
+ * Google Cloud Storage implementation of Apache Iceberg OutputFile using
HttpClient for REST Operations
+ */
+class GoogleCloudStorageOutputFile implements OutputFile {
+
+ private final HttpClientProvider httpClientProvider;
+
+ private final GoogleCloudStorageProperties storageProperties;
+
+ private final GoogleCloudStorageLocation location;
+
+ private final String path;
+
+ GoogleCloudStorageOutputFile(
+ final HttpClientProvider httpClientProvider,
+ final GoogleCloudStorageProperties storageProperties,
+ final GoogleCloudStorageLocation location,
+ final String path
+ ) {
+ this.httpClientProvider = httpClientProvider;
+ this.storageProperties = storageProperties;
+ this.location = location;
+ this.path = path;
+ }
+
+ /**
+ * Create OutputStream after checking for existing InputFile to avoid
potential conflicts
+ *
+ * @return OutputStream for writing
+ */
+ @Override
+ public PositionOutputStream create() {
+ if (toInputFile().exists()) {
+ throw new AlreadyExistsException("File already exists [%s]", path);
+ }
+ return createOrOverwrite();
+ }
+
+ /**
+ * Create OutputStream or overwrite existing file when necessary
+ *
+ * @return OutputStream for writing
+ */
+ @Override
+ public PositionOutputStream createOrOverwrite() {
+ try {
+ return new
GoogleCloudStoragePositionOutputStream(httpClientProvider, storageProperties,
location);
+ } catch (final IOException e) {
+ throw new UncheckedIOException("Failed to create stream
[%s]".formatted(path), e);
+ }
+ }
+
+ /**
+ * Get Location of OutputFile
+ *
+ * @return Location
+ */
+ @Override
+ public String location() {
+ return path;
+ }
+
+ /**
+ * Get InputFile corresponding to location of current OutputFile
+ *
+ * @return InputFile for current location
+ */
+ @Override
+ public InputFile toInputFile() {
+ return new GoogleCloudStorageInputFile(httpClientProvider,
storageProperties, location, path, null);
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/GoogleCloudStoragePositionOutputStream.java
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/GoogleCloudStoragePositionOutputStream.java
new file mode 100644
index 00000000000..9854e996e29
--- /dev/null
+++
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/GoogleCloudStoragePositionOutputStream.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg.gcs;
+
+import org.apache.iceberg.io.PositionOutputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.http.HttpHeaders;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.util.Optional;
+
+import static java.net.HttpURLConnection.HTTP_CREATED;
+import static java.net.HttpURLConnection.HTTP_OK;
+import static
org.apache.nifi.services.iceberg.gcs.GoogleCloudStorageHeader.CONTENT_RANGE;
+import static
org.apache.nifi.services.iceberg.gcs.GoogleCloudStorageHeader.ENCRYPTION_ALGORITHM;
+import static
org.apache.nifi.services.iceberg.gcs.GoogleCloudStorageHeader.ENCRYPTION_KEY;
+import static
org.apache.nifi.services.iceberg.gcs.GoogleCloudStorageHeader.ENCRYPTION_KEY_SHA256;
+import static
org.apache.nifi.services.iceberg.gcs.GoogleCloudStorageHeader.LOCATION;
+import static
org.apache.nifi.services.iceberg.gcs.GoogleCloudStorageHeader.UPLOAD_CONTENT_TYPE;
+
+/**
+ * Google Cloud Storage implementation of PositionOutputStream supporting
resumable uploads
+ */
+class GoogleCloudStoragePositionOutputStream extends PositionOutputStream {
+
+ private static final String CONTENT_TYPE_OCTET_STREAM =
"application/octet-stream";
+ private static final String CONTENT_RANGE_FORMAT = "bytes %d-%d/%s";
+ private static final String RANGE_TOTAL_UNKNOWN = "*";
+
+ private static final int HTTP_RESUME_INCOMPLETE = 308;
+
+ private static final Logger logger =
LoggerFactory.getLogger(GoogleCloudStoragePositionOutputStream.class);
+
+ private final HttpClientProvider httpClientProvider;
+ private final GoogleCloudStorageProperties storageProperties;
+ private final String sessionUri;
+ private final byte[] buffer;
+ private final int chunkSize;
+ private int bufferPosition;
+ private long position;
+ private boolean closed;
+
+ /**
+ * Google Cloud Storage Position OutputStream constructor initiates a
resumable upload on creation
+ *
+ * @param httpClientProvider HTTP Client Provider
+ * @param storageProperties Google Cloud Storage Properties
+ * @param location Google Cloud Storage Location
+ * @throws IOException Thrown on failure to initiate upload
+ */
+ GoogleCloudStoragePositionOutputStream(
+ final HttpClientProvider httpClientProvider,
+ final GoogleCloudStorageProperties storageProperties,
+ final GoogleCloudStorageLocation location
+ ) throws IOException {
+ this.httpClientProvider = httpClientProvider;
+ this.storageProperties = storageProperties;
+ this.chunkSize = storageProperties.writeChunkSize();
+ this.buffer = new byte[chunkSize];
+ this.sessionUri = initiateUpload(location);
+ }
+
+ /**
+ * Get stream position
+ *
+ * @return Current stream position
+ */
+ @Override
+ public long getPos() {
+ return position;
+ }
+
+ /**
+ * Write byte to stream and increment position
+ *
+ * @param data byte to be written
+ * @throws IOException Thrown on failure to send chunk
+ */
+ @Override
+ public void write(final int data) throws IOException {
+ ensureOpen();
+ if (bufferPosition >= chunkSize) {
+ sendChunk(false);
+ }
+ buffer[bufferPosition++] = (byte) data;
+ position++;
+ }
+
+ /**
+ * Write bytes to stream and increment position
+ *
+ * @param data bytes to be written
+ * @param offset initial offset in bytes to be written
+ * @param length number of bytes to be written
+ * @throws IOException Thrown on failure to send chunk
+ */
+ @Override
+ public void write(final byte[] data, final int offset, final int length)
throws IOException {
+ ensureOpen();
+
+ int remaining = length;
+ int currentOffset = offset;
+ while (remaining > 0) {
+ if (bufferPosition >= chunkSize) {
+ sendChunk(false);
+ }
+ final int space = chunkSize - bufferPosition;
+ final int copyLength = Math.min(remaining, space);
+
+ System.arraycopy(data, currentOffset, buffer, bufferPosition,
copyLength);
+ bufferPosition += copyLength;
+ currentOffset += copyLength;
+ remaining -= copyLength;
+ position += copyLength;
+ }
+ }
+
+ /**
+ * Close stream and send final chunk
+ *
+ * @throws IOException Thrown on failure to send chunk
+ */
+ @Override
+ public void close() throws IOException {
+ if (!closed) {
+ closed = true;
+ sendChunk(true);
+ }
+ }
+
+ private void ensureOpen() throws IOException {
+ if (closed) {
+ throw new IOException("Stream closed");
+ }
+ }
+
+ private String initiateUpload(final GoogleCloudStorageLocation location)
throws IOException {
+ final String uri = storageProperties.uploadUri(location);
+ final HttpRequest.Builder builder =
httpClientProvider.newRequestBuilder(uri)
+ .POST(HttpRequest.BodyPublishers.noBody())
+ .header(UPLOAD_CONTENT_TYPE.getHeader(),
CONTENT_TYPE_OCTET_STREAM);
+
+ addEncryptionHeaders(builder);
+
+ final HttpResponse<String> response =
httpClientProvider.send(builder.build(), HttpResponse.BodyHandlers.ofString());
+ final int statusCode = response.statusCode();
+ if (HTTP_OK == statusCode) {
+ final HttpHeaders headers = response.headers();
+ final Optional<String> locationHeaderFound =
headers.firstValue(LOCATION.getHeader());
+ if (locationHeaderFound.isPresent()) {
+ logger.debug("Upload initiated [{}] HTTP {}", uri, statusCode);
+ return locationHeaderFound.get();
+ } else {
+ throw new IOException("Initiate upload failed [%s] Location
response header not found".formatted(location));
+ }
+ } else {
+ final String responseBody = response.body();
+ throw new IOException("Initiate upload failed [%s] HTTP %d
[%s]".formatted(location, statusCode, responseBody));
+ }
+ }
+
+ private void sendChunk(final boolean lastChunk) throws IOException {
+ if (bufferPosition == 0 && !lastChunk) {
+ return;
+ }
+
+ final HttpRequest.Builder builder =
HttpRequest.newBuilder(URI.create(sessionUri))
+ .timeout(HttpClientProvider.REQUEST_TIMEOUT);
+
+ if (bufferPosition == 0) {
+ builder.PUT(HttpRequest.BodyPublishers.noBody());
+ } else {
+ final long rangeStart = position - bufferPosition;
+ final long rangeEnd = position - 1;
+ final String total = lastChunk ? String.valueOf(position) :
RANGE_TOTAL_UNKNOWN;
+ final String contentRange =
CONTENT_RANGE_FORMAT.formatted(rangeStart, rangeEnd, total);
+
+ builder.PUT(HttpRequest.BodyPublishers.ofByteArray(buffer, 0,
bufferPosition))
+ .header(CONTENT_RANGE.getHeader(), contentRange);
+ }
+
+ final HttpResponse<String> response =
httpClientProvider.send(builder.build(), HttpResponse.BodyHandlers.ofString());
+ final int statusCode = response.statusCode();
+ logger.debug("Buffer uploaded [{}] HTTP {}", sessionUri, statusCode);
+
+ if (lastChunk) {
+ if (statusCode != HTTP_OK && statusCode != HTTP_CREATED) {
+ throw new IOException("Upload finalization failed HTTP %d
[%s]".formatted(statusCode, response.body()));
+ }
+ } else {
+ if (statusCode != HTTP_RESUME_INCOMPLETE) {
+ throw new IOException("Chunk upload failed HTTP %d
[%s]".formatted(statusCode, response.body()));
+ }
+ }
+
+ bufferPosition = 0;
+ }
+
+ private void addEncryptionHeaders(final HttpRequest.Builder builder) {
+ final String key = storageProperties.encryptionKey();
+ if (key != null && !key.isBlank()) {
+ builder.header(ENCRYPTION_ALGORITHM.getHeader(),
storageProperties.encryptionAlgorithm());
+ builder.header(ENCRYPTION_KEY.getHeader(), key);
+ builder.header(ENCRYPTION_KEY_SHA256.getHeader(),
KeyDigestProvider.getDigestEncoded(key));
+ }
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/GoogleCloudStorageProperties.java
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/GoogleCloudStorageProperties.java
new file mode 100644
index 00000000000..701aca1bf87
--- /dev/null
+++
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/GoogleCloudStorageProperties.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg.gcs;
+
+import java.util.Map;
+import java.util.Objects;
+
+import static
org.apache.nifi.services.iceberg.gcs.GoogleCloudStorageProperty.DECRYPTION_KEY;
+import static
org.apache.nifi.services.iceberg.gcs.GoogleCloudStorageProperty.ENCRYPTION_KEY;
+import static
org.apache.nifi.services.iceberg.gcs.GoogleCloudStorageProperty.READ_CHUNK_SIZE_BYTES;
+import static
org.apache.nifi.services.iceberg.gcs.GoogleCloudStorageProperty.SERVICE_HOST;
+import static
org.apache.nifi.services.iceberg.gcs.GoogleCloudStorageProperty.USER_PROJECT;
+import static
org.apache.nifi.services.iceberg.gcs.GoogleCloudStorageProperty.WRITE_CHUNK_SIZE_BYTES;
+
+/**
+ * Google Cloud Storage Properties encapsulating value parsing and URI
resolution
+ */
+class GoogleCloudStorageProperties {
+
+ static final String DEFAULT_SERVICE_HOST =
"https://storage.googleapis.com";
+ static final int DEFAULT_WRITE_CHUNK_SIZE = 8 * 1024 * 1024;
+ static final int DEFAULT_READ_CHUNK_SIZE = 2 * 1024 * 1024;
+
+ private static final String METADATA_URI_FORMAT =
"%s/storage/v1/b/%s/o/%s";
+ private static final String UPLOAD_URI_FORMAT =
"%s/upload/storage/v1/b/%s/o?uploadType=resumable&name=%s";
+ private static final String DOWNLOAD_URI_FORMAT =
"%s/storage/v1/b/%s/o/%s?alt=media";
+ private static final String USER_PROJECT_FIRST_FORMAT =
"%s?userProject=%s";
+ private static final String USER_PROJECT_ADDITIONAL_FORMAT =
"%s&userProject=%s";
+
+ private static final String ENCRYPTION_ALGORITHM = "AES256";
+
+ private final String serviceHost;
+ private final String userProject;
+ private final String encryptionKey;
+ private final String decryptionKey;
+ private final int writeChunkSize;
+ private final int readChunkSize;
+
+ GoogleCloudStorageProperties(final Map<String, String> properties) {
+ Objects.requireNonNull(properties, "Properties required");
+
+ final String serviceHost = properties.get(SERVICE_HOST.getProperty());
+ if (serviceHost == null || serviceHost.isBlank()) {
+ this.serviceHost = DEFAULT_SERVICE_HOST;
+ } else {
+ this.serviceHost = serviceHost;
+ }
+
+ this.userProject = properties.get(USER_PROJECT.getProperty());
+ this.encryptionKey = properties.get(ENCRYPTION_KEY.getProperty());
+ this.decryptionKey = properties.get(DECRYPTION_KEY.getProperty());
+
+ final String writeChunkSizeBytes =
properties.get(WRITE_CHUNK_SIZE_BYTES.getProperty());
+ if (writeChunkSizeBytes == null) {
+ this.writeChunkSize = DEFAULT_WRITE_CHUNK_SIZE;
+ } else {
+ this.writeChunkSize = Integer.parseInt(writeChunkSizeBytes);
+ }
+
+ final String readChunkSizeBytes =
properties.get(READ_CHUNK_SIZE_BYTES.getProperty());
+ if (readChunkSizeBytes == null) {
+ this.readChunkSize = DEFAULT_READ_CHUNK_SIZE;
+ } else {
+ this.readChunkSize = Integer.parseInt(readChunkSizeBytes);
+ }
+ }
+
+ String encryptionAlgorithm() {
+ return ENCRYPTION_ALGORITHM;
+ }
+
+ String encryptionKey() {
+ return encryptionKey;
+ }
+
+ String decryptionKey() {
+ return decryptionKey;
+ }
+
+ int writeChunkSize() {
+ return writeChunkSize;
+ }
+
+ int readChunkSize() {
+ return readChunkSize;
+ }
+
+ String metadataUri(final GoogleCloudStorageLocation location) {
+ final String uri = METADATA_URI_FORMAT.formatted(serviceHost,
location.bucket(), location.encodedObjectKey());
+
+ final String metadataUri;
+ if (userProject == null) {
+ metadataUri = uri;
+ } else {
+ metadataUri = USER_PROJECT_FIRST_FORMAT.formatted(uri,
userProject);
+ }
+
+ return metadataUri;
+ }
+
+ String uploadUri(final GoogleCloudStorageLocation location) {
+ final String uri = UPLOAD_URI_FORMAT.formatted(serviceHost,
location.bucket(), location.encodedObjectKey());
+
+ final String uploadUri;
+ if (userProject == null) {
+ uploadUri = uri;
+ } else {
+ uploadUri = USER_PROJECT_ADDITIONAL_FORMAT.formatted(uri,
userProject);
+ }
+
+ return uploadUri;
+ }
+
+ String downloadUri(final GoogleCloudStorageLocation location) {
+ final String uri = DOWNLOAD_URI_FORMAT.formatted(serviceHost,
location.bucket(), location.encodedObjectKey());
+
+ final String downloadUri;
+ if (userProject == null) {
+ downloadUri = uri;
+ } else {
+ downloadUri = USER_PROJECT_ADDITIONAL_FORMAT.formatted(uri,
userProject);
+ }
+
+ return downloadUri;
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/GoogleCloudStorageProperty.java
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/GoogleCloudStorageProperty.java
new file mode 100644
index 00000000000..11e785e5df4
--- /dev/null
+++
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/GoogleCloudStorageProperty.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg.gcs;
+
+/**
+ * Google Cloud Storage configuration properties aligned with Apache Iceberg
GCPProperties
+ */
+enum GoogleCloudStorageProperty {
+
+ OAUTH2_TOKEN("gcs.oauth2.token"),
+
+ SERVICE_HOST("gcs.service.host"),
+
+ WRITE_CHUNK_SIZE_BYTES("gcs.channel.write.chunk-size-bytes"),
+
+ READ_CHUNK_SIZE_BYTES("gcs.channel.read.chunk-size-bytes"),
+
+ USER_PROJECT("gcs.user-project"),
+
+ ENCRYPTION_KEY("gcs.encryption-key"),
+
+ DECRYPTION_KEY("gcs.decryption-key");
+
+ private final String property;
+
+ GoogleCloudStorageProperty(final String property) {
+ this.property = property;
+ }
+
+ String getProperty() {
+ return property;
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/GoogleCloudStorageSeekableInputStream.java
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/GoogleCloudStorageSeekableInputStream.java
new file mode 100644
index 00000000000..1cb718dede5
--- /dev/null
+++
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/GoogleCloudStorageSeekableInputStream.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg.gcs;
+
+import org.apache.iceberg.io.SeekableInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static java.net.HttpURLConnection.HTTP_OK;
+import static java.net.HttpURLConnection.HTTP_PARTIAL;
+import static
org.apache.nifi.services.iceberg.gcs.GoogleCloudStorageHeader.CONTENT_RANGE;
+import static
org.apache.nifi.services.iceberg.gcs.GoogleCloudStorageHeader.ENCRYPTION_ALGORITHM;
+import static
org.apache.nifi.services.iceberg.gcs.GoogleCloudStorageHeader.ENCRYPTION_KEY;
+import static
org.apache.nifi.services.iceberg.gcs.GoogleCloudStorageHeader.ENCRYPTION_KEY_SHA256;
+import static
org.apache.nifi.services.iceberg.gcs.GoogleCloudStorageHeader.RANGE;
+
+/**
+ * Google Cloud Storage implementation of SeekableInputStream supporting
chunked buffered reads
+ */
+class GoogleCloudStorageSeekableInputStream extends SeekableInputStream {
+
+ private static final Pattern CONTENT_RANGE_TOTAL =
Pattern.compile("/(\\d+)$");
+ private static final int FIRST_GROUP = 1;
+ private static final int END_OF_STREAM = -1;
+
+ private static final String RANGE_FORMAT = "bytes=%d-%d";
+
+ private static final int HTTP_RANGE_NOT_SATISFIABLE = 416;
+
+ private static final Logger logger =
LoggerFactory.getLogger(GoogleCloudStorageSeekableInputStream.class);
+
+ private final HttpClientProvider httpClientProvider;
+ private final GoogleCloudStorageLocation location;
+ private final int readChunkSize;
+ private final String downloadUri;
+ private final String decryptionKey;
+ private final String encryptionAlgorithm;
+
+ private long pos;
+ private long contentLength;
+ private boolean contentLengthProvided;
+ private byte[] buffer;
+ private long bufferStart;
+ private int bufferLength;
+ private boolean closed;
+
+ /**
+ * Google Cloud Storage SeekableInputStream constructor with optional
Content Length
+ *
+ * @param httpClientProvider HTTP Client Provider
+ * @param storageProperties Google Cloud Storage Properties
+ * @param location Google Cloud Storage Location
+ * @param contentLength Content Length or null when not known
+ */
+ GoogleCloudStorageSeekableInputStream(
+ final HttpClientProvider httpClientProvider,
+ final GoogleCloudStorageProperties storageProperties,
+ final GoogleCloudStorageLocation location,
+ final Long contentLength
+ ) {
+ this.httpClientProvider = httpClientProvider;
+ this.location = location;
+ this.readChunkSize = storageProperties.readChunkSize();
+ this.downloadUri = storageProperties.downloadUri(location);
+ this.decryptionKey = storageProperties.decryptionKey();
+ this.encryptionAlgorithm = storageProperties.encryptionAlgorithm();
+ if (contentLength != null) {
+ this.contentLength = contentLength;
+ this.contentLengthProvided = true;
+ }
+ }
+
+ /**
+ * Get current stream position
+ *
+ * @return Current position
+ */
+ @Override
+ public long getPos() {
+ return pos;
+ }
+
+ /**
+ * Seek to new position
+ *
+ * @param newPos New position
+ */
+ @Override
+ public void seek(final long newPos) {
+ if (newPos < 0) {
+ throw new IllegalArgumentException("Seek position must not be
negative [%d]".formatted(newPos));
+ }
+ this.pos = newPos;
+ }
+
+ /**
+ * Read data and buffer as needed
+ *
+ * @return Data byte read
+ * @throws IOException Thrown on failure to buffer
+ */
+ @Override
+ public int read() throws IOException {
+ ensureOpen();
+ if (contentLengthProvided && pos >= contentLength) {
+ return END_OF_STREAM;
+ }
+
+ ensureBuffer();
+ if (bufferLength == 0) {
+ return END_OF_STREAM;
+ }
+
+ final int offset = (int) (pos - bufferStart);
+ if (offset >= bufferLength) {
+ return END_OF_STREAM;
+ }
+
+ pos++;
+ return buffer[offset] & 0xFF;
+ }
+
+ /**
+ * Read data into buffer with start offset and length requested
+ *
+ * @param data Data buffer for bytes read
+ * @param offset Start offset for data buffer
+ * @param length Maximum number of bytes to read
+ * @return Number of bytes read
+ * @throws IOException Thrown on failure to buffer
+ */
+ @Override
+ public int read(final byte[] data, final int offset, final int length)
throws IOException {
+ ensureOpen();
+ if (length == 0) {
+ return 0;
+ }
+ if (contentLengthProvided && pos >= contentLength) {
+ return END_OF_STREAM;
+ }
+
+ int totalRead = 0;
+ int remaining = length;
+ int dataOffset = offset;
+
+ while (remaining > 0) {
+ ensureBuffer();
+ if (bufferLength == 0) {
+ break;
+ }
+ final int bufOffset = (int) (pos - bufferStart);
+ if (bufOffset >= bufferLength) {
+ break;
+ }
+ final int available = bufferLength - bufOffset;
+ final int copyLength = Math.min(remaining, available);
+ System.arraycopy(buffer, bufOffset, data, dataOffset, copyLength);
+ pos += copyLength;
+ totalRead += copyLength;
+ dataOffset += copyLength;
+ remaining -= copyLength;
+ }
+
+ return totalRead == 0 ? END_OF_STREAM : totalRead;
+ }
+
+ /**
+ * Get number of bytes available in buffer for reading
+ *
+ * @return Number of bytes available for reading
+ */
+ @Override
+ public int available() {
+ if (buffer == null) {
+ return 0;
+ }
+ final int bufferOffset = (int) (pos - bufferStart);
+ final int bufferRemaining = bufferLength - bufferOffset;
+ return Math.max(0, bufferRemaining);
+ }
+
+ /**
+ * Close stream and clear buffer
+ */
+ @Override
+ public void close() {
+ closed = true;
+ buffer = null;
+ }
+
+ private void ensureOpen() throws IOException {
+ if (closed) {
+ throw new IOException("Stream closed");
+ }
+ }
+
+ private void ensureBuffer() throws IOException {
+ if (buffer != null) {
+ final int offset = (int) (pos - bufferStart);
+ if (offset >= 0 && offset < bufferLength) {
+ return;
+ }
+ }
+ fillBuffer();
+ }
+
+ private void fillBuffer() throws IOException {
+ final long rangeStart = pos;
+ final long rangeEnd = rangeStart + readChunkSize - 1;
+ final String range = RANGE_FORMAT.formatted(rangeStart, rangeEnd);
+
+ final HttpRequest.Builder builder =
httpClientProvider.newRequestBuilder(downloadUri)
+ .GET()
+ .header(RANGE.getHeader(), range);
+
+ addDecryptionHeaders(builder);
+
+ final HttpResponse<byte[]> response =
httpClientProvider.send(builder.build(),
HttpResponse.BodyHandlers.ofByteArray());
+ final int statusCode = response.statusCode();
+ logger.debug("Read object [{}] Range [{}] HTTP {}", downloadUri,
range, statusCode);
+
+ // Set empty buffer when Google Cloud Storage cannot provide range
requested
+ if (HTTP_RANGE_NOT_SATISFIABLE == statusCode) {
+ bufferStart = pos;
+ bufferLength = 0;
+ if (buffer == null) {
+ buffer = new byte[0];
+ }
+ return;
+ }
+
+ if (statusCode != HTTP_OK && statusCode != HTTP_PARTIAL) {
+ throw new IOException("Read object failed [%s] Range [%s] HTTP
%d".formatted(location, range, statusCode));
+ }
+
+ // Set Content-Length when not provided
+ if (!contentLengthProvided) {
+ parseContentLength(response);
+ }
+
+ buffer = response.body();
+ bufferStart = rangeStart;
+ bufferLength = buffer.length;
+ }
+
+ private void parseContentLength(final HttpResponse<?> response) {
+
response.headers().firstValue(CONTENT_RANGE.getHeader()).ifPresent(header -> {
+ final Matcher matcher = CONTENT_RANGE_TOTAL.matcher(header);
+ if (matcher.find()) {
+ final String contentRangeTotal = matcher.group(FIRST_GROUP);
+ contentLength = Long.parseLong(contentRangeTotal);
+ contentLengthProvided = true;
+ }
+ });
+ }
+
+ private void addDecryptionHeaders(final HttpRequest.Builder builder) {
+ if (decryptionKey != null && !decryptionKey.isBlank()) {
+ builder.header(ENCRYPTION_ALGORITHM.getHeader(),
encryptionAlgorithm);
+ builder.header(ENCRYPTION_KEY.getHeader(), decryptionKey);
+ builder.header(ENCRYPTION_KEY_SHA256.getHeader(),
KeyDigestProvider.getDigestEncoded(decryptionKey));
+ }
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/HttpClientProvider.java
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/HttpClientProvider.java
new file mode 100644
index 00000000000..3c2916ed01d
--- /dev/null
+++
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/HttpClientProvider.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg.gcs;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.time.Duration;
+import java.util.Set;
+
+import static java.net.HttpURLConnection.HTTP_BAD_GATEWAY;
+import static java.net.HttpURLConnection.HTTP_CLIENT_TIMEOUT;
+import static java.net.HttpURLConnection.HTTP_GATEWAY_TIMEOUT;
+import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR;
+import static java.net.HttpURLConnection.HTTP_UNAVAILABLE;
+import static
org.apache.nifi.services.iceberg.gcs.GoogleCloudStorageHeader.AUTHORIZATION;
+
+/**
+ * HTTP Client Provider encapsulates request and response methods with a
configured HTTP Client and standard settings
+ */
+class HttpClientProvider {
+
+ private static final Logger logger =
LoggerFactory.getLogger(HttpClientProvider.class);
+
+ private static final String BEARER_FORMAT = "Bearer %s";
+
+ /** Connect Timeout based on default setting from Google Cloud Storage
library */
+ private static final Duration CONNECT_TIMEOUT = Duration.ofSeconds(20);
+
+ /** Request Timeout provides failsafe behavior */
+ static final Duration REQUEST_TIMEOUT = Duration.ofMinutes(5);
+
+ private static final Duration DEFAULT_INITIAL_BACKOFF =
Duration.ofSeconds(1);
+
+ private static final int SLEEP_MULTIPLIER = 5;
+
+ private static final int MAX_ATTEMPTS = 3;
+
+ private static final int HTTP_TOO_MANY_REQUESTS = 429;
+
+ /** HTTP status codes considered retriable aligned with Google Cloud
Storage library */
+ static final Set<Integer> RETRIABLE_STATUS_CODES = Set.of(
+ HTTP_CLIENT_TIMEOUT,
+ HTTP_TOO_MANY_REQUESTS,
+ HTTP_INTERNAL_ERROR,
+ HTTP_BAD_GATEWAY,
+ HTTP_UNAVAILABLE,
+ HTTP_GATEWAY_TIMEOUT
+ );
+
+ private final String bearerToken;
+ private final HttpClient httpClient;
+ private final Duration initialBackoff;
+
+ HttpClientProvider(final String bearerToken) {
+ this(bearerToken, DEFAULT_INITIAL_BACKOFF);
+ }
+
+ HttpClientProvider(final String bearerToken, final Duration
initialBackoff) {
+ this.bearerToken = bearerToken;
+ this.initialBackoff = initialBackoff;
+ this.httpClient = HttpClient.newBuilder()
+ .connectTimeout(CONNECT_TIMEOUT)
+ .build();
+ }
+
+ /**
+ * Send HTTP request with retry on status codes considered retriable
according to Google Cloud Storage API
+ *
+ * @param request HTTP Request to be executed
+ * @param bodyHandler HTTP Response Body Handler
+ * @return HTTP Response
+ * @param <T> HTTP Response Body Type
+ * @throws IOException Thrown on failure after retries exhausted
+ */
+ <T> HttpResponse<T> send(final HttpRequest request, final
HttpResponse.BodyHandler<T> bodyHandler) throws IOException {
+ HttpResponse<T> response = null;
+ IOException lastException = null;
+
+ final String requestMethod = request.method();
+ final URI requestUri = request.uri();
+ for (int attempt = 1; attempt <= MAX_ATTEMPTS; attempt++) {
+ try {
+ response = httpClient.send(request, bodyHandler);
+
+ final int statusCode = response.statusCode();
+ if (RETRIABLE_STATUS_CODES.contains(statusCode) && attempt <
MAX_ATTEMPTS) {
+ logger.info("{} [{}] HTTP {} for attempt {} of {}",
requestMethod, requestUri, statusCode, attempt, MAX_ATTEMPTS);
+ sleep(attempt);
+ continue;
+ }
+
+ return response;
+ } catch (final InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException("%s [%s] request
interrupted".formatted(requestMethod, requestUri), e);
+ } catch (final IOException e) {
+ lastException = e;
+ if (attempt < MAX_ATTEMPTS) {
+ logger.info("{} [{}] request failed for attempt {} of {}",
requestMethod, requestUri, attempt, MAX_ATTEMPTS, e);
+ sleep(attempt);
+ }
+ }
+ }
+
+ if (response == null) {
+ throw lastException;
+ }
+
+ return response;
+ }
+
+ HttpRequest.Builder newRequestBuilder(final String uri) {
+ final HttpRequest.Builder builder =
HttpRequest.newBuilder(URI.create(uri))
+ .timeout(REQUEST_TIMEOUT);
+ if (bearerToken != null) {
+ builder.header(AUTHORIZATION.getHeader(),
BEARER_FORMAT.formatted(bearerToken));
+ }
+ return builder;
+ }
+
+ void close() {
+ httpClient.close();
+ }
+
+ private void sleep(final int attempt) throws IOException {
+ final long duration = initialBackoff.toMillis() * attempt *
SLEEP_MULTIPLIER;
+ try {
+ Thread.sleep(duration);
+ } catch (final InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Retry interrupted", e);
+ }
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/HttpRequestException.java
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/HttpRequestException.java
new file mode 100644
index 00000000000..61492999310
--- /dev/null
+++
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/HttpRequestException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg.gcs;
+
+/**
+ * HTTP Request Exception indicating failures related to socket communication
or request preparation
+ */
+class HttpRequestException extends RuntimeException {
+
+ HttpRequestException(final String message, final Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/HttpResponseException.java
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/HttpResponseException.java
new file mode 100644
index 00000000000..77a0417595d
--- /dev/null
+++
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/HttpResponseException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg.gcs;
+
+/**
+ * HTTP Response Exception indicating failures related to status codes or
headers
+ */
+class HttpResponseException extends RuntimeException {
+
+ HttpResponseException(final String message) {
+ super(message);
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/KeyDigestProvider.java
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/KeyDigestProvider.java
new file mode 100644
index 00000000000..1ed63acc3e2
--- /dev/null
+++
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/KeyDigestProvider.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg.gcs;
+
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Base64;
+
+/**
+ * Computes a Base64-encoded SHA-256 digest of a Base64-encoded encryption
objectKey,
+ * as required by the GCS JSON API customer-supplied encryption objectKey
headers.
+ */
+class KeyDigestProvider {
+
+ private static final String DIGEST_ALGORITHM = "SHA-256";
+
+ private KeyDigestProvider() {
+ }
+
+ static String getDigestEncoded(final String keyEncoded) {
+ try {
+ final byte[] keyDecoded = Base64.getDecoder().decode(keyEncoded);
+ final byte[] hash =
MessageDigest.getInstance(DIGEST_ALGORITHM).digest(keyDecoded);
+ return Base64.getEncoder().encodeToString(hash);
+ } catch (final NoSuchAlgorithmException e) {
+ throw new IllegalStateException("SHA-256 not found", e);
+ }
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
new file mode 100644
index 00000000000..e29160e46bd
--- /dev/null
+++
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.services.iceberg.gcs.GCSIcebergFileIOProvider
diff --git
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/test/java/org/apache/nifi/services/iceberg/gcs/GCSIcebergFileIOProviderTest.java
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/test/java/org/apache/nifi/services/iceberg/gcs/GCSIcebergFileIOProviderTest.java
new file mode 100644
index 00000000000..8eb68f74dcb
--- /dev/null
+++
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/test/java/org/apache/nifi/services/iceberg/gcs/GCSIcebergFileIOProviderTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg.gcs;
+
+import org.apache.iceberg.io.FileIO;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.services.iceberg.ProviderContext;
+import org.apache.nifi.util.NoOpProcessor;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+
+import static
org.apache.nifi.services.iceberg.gcs.GoogleCloudStorageProperty.OAUTH2_TOKEN;
+import static
org.apache.nifi.services.iceberg.gcs.GoogleCloudStorageProperty.SERVICE_HOST;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+class GCSIcebergFileIOProviderTest {
+ private static final String SERVICE_ID =
GCSIcebergFileIOProvider.class.getSimpleName();
+ private static final String ACCESS_TOKEN = "access-token";
+ private static final String LOCALHOST_URL = "http://localhost:9000";
+
+ private TestRunner runner;
+
+ private GCSIcebergFileIOProvider provider;
+
+ @BeforeEach
+ void setProvider() throws InitializationException {
+ provider = new GCSIcebergFileIOProvider();
+ runner = TestRunners.newTestRunner(NoOpProcessor.class);
+ runner.addControllerService(SERVICE_ID, provider);
+ }
+
+ @AfterEach
+ void disableProvider() {
+ runner.disableControllerService(provider);
+ }
+
+ @Test
+ void testGetFileIO() {
+ runner.enableControllerService(provider);
+
+ final Map<String, String> properties = Map.of();
+ final ProviderContext providerContext = () -> properties;
+
+ try (FileIO fileIO = provider.getFileIO(providerContext)) {
+ assertNotNull(fileIO);
+ assertInstanceOf(GoogleCloudStorageFileIO.class, fileIO);
+ }
+ }
+
+ @Test
+ void testGetFileIOWithVendedToken() {
+ runner.enableControllerService(provider);
+
+ final Map<String, String> properties = Map.of(
+ OAUTH2_TOKEN.getProperty(), ACCESS_TOKEN,
+ SERVICE_HOST.getProperty(), LOCALHOST_URL
+ );
+ final ProviderContext providerContext = () -> properties;
+
+ try (FileIO fileIO = provider.getFileIO(providerContext)) {
+ assertNotNull(fileIO);
+ assertInstanceOf(GoogleCloudStorageFileIO.class, fileIO);
+ final Map<String, String> configuredProperties =
fileIO.properties();
+ assertEquals(ACCESS_TOKEN,
configuredProperties.get(OAUTH2_TOKEN.getProperty()));
+ assertEquals(LOCALHOST_URL,
configuredProperties.get(SERVICE_HOST.getProperty()));
+ }
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/test/java/org/apache/nifi/services/iceberg/gcs/GoogleCloudStorageFileIOTest.java
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/test/java/org/apache/nifi/services/iceberg/gcs/GoogleCloudStorageFileIOTest.java
new file mode 100644
index 00000000000..376656e92b9
--- /dev/null
+++
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/test/java/org/apache/nifi/services/iceberg/gcs/GoogleCloudStorageFileIOTest.java
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg.gcs;
+
+import mockwebserver3.MockResponse;
+import mockwebserver3.MockWebServer;
+import mockwebserver3.RecordedRequest;
+import okio.ByteString;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.PositionOutputStream;
+import org.apache.iceberg.io.SeekableInputStream;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+
+import static java.net.HttpURLConnection.HTTP_FORBIDDEN;
+import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
+import static java.net.HttpURLConnection.HTTP_NO_CONTENT;
+import static java.net.HttpURLConnection.HTTP_OK;
+import static java.net.HttpURLConnection.HTTP_PARTIAL;
+import static
org.apache.nifi.services.iceberg.gcs.GoogleCloudStorageHeader.AUTHORIZATION;
+import static
org.apache.nifi.services.iceberg.gcs.GoogleCloudStorageHeader.CONTENT_RANGE;
+import static
org.apache.nifi.services.iceberg.gcs.GoogleCloudStorageHeader.LOCATION;
+import static
org.apache.nifi.services.iceberg.gcs.GoogleCloudStorageProperty.OAUTH2_TOKEN;
+import static
org.apache.nifi.services.iceberg.gcs.GoogleCloudStorageProperty.READ_CHUNK_SIZE_BYTES;
+import static
org.apache.nifi.services.iceberg.gcs.GoogleCloudStorageProperty.SERVICE_HOST;
+import static
org.apache.nifi.services.iceberg.gcs.GoogleCloudStorageProperty.USER_PROJECT;
+import static
org.apache.nifi.services.iceberg.gcs.GoogleCloudStorageProperty.WRITE_CHUNK_SIZE_BYTES;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class GoogleCloudStorageFileIOTest {
+
+ private static final String TEST_TOKEN = "test-token";
+ private static final String TOKEN = "token";
+ private static final String BEARER_TEST_TOKEN = "Bearer test-token";
+
+ private static final String DELETE_METHOD = "DELETE";
+ private static final String POST_METHOD = "POST";
+ private static final String PUT_METHOD = "PUT";
+
+ private static final String UPLOAD_SESSION_PATH = "/upload-session";
+
+ private static final String BUCKET_PATH_FILE_URI =
"gs://bucket/path/to/file.parquet";
+ private static final String BUCKET_MISSING_FILE_URI =
"gs://bucket/missing-file.parquet";
+ private static final String BUCKET_EXISTS_URI =
"gs://bucket/exists.parquet";
+ private static final String BUCKET_MISSING_URI =
"gs://bucket/missing.parquet";
+ private static final String BUCKET_FILE_PARQUET_URI =
"gs://bucket/file.parquet";
+ private static final String BUCKET_OUTPUT_URI =
"gs://bucket/output.parquet";
+ private static final String BUCKET_FILE_TXT_URI = "gs://bucket/file.txt";
+ private static final String BUCKET_KEY_URI = "gs://bucket/objectKey";
+
+ private static final String CONTENT_RANGE_FORMAT = "bytes %d-%d/%d";
+ private static final String SIZE_RESPONSE_FORMAT = """
+ {"size": "%d"}""";
+ private static final String SIZE_RESPONSE_12345 = """
+ {"size": "12345"}""";
+ private static final String USER_PROJECT_QUERY = "userProject=my-project";
+ private static final String MY_PROJECT = "my-project";
+
+ private static final String HELLO_WORLD = "hello world";
+ private static final String READ_CONTENT = "test file content for reading";
+ private static final String ALPHABET = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
+ private static final String READ_CHUNK_SIZE_10 = "10";
+ private static final String LOCALHOST_URL_FORMAT = "http://localhost:%d";
+
+ private static final long EXPECTED_LENGTH = 12345L;
+ private static final long KNOWN_LENGTH = 42L;
+
+ private MockWebServer mockWebServer;
+ private GoogleCloudStorageFileIO fileIO;
+ private String baseUrl;
+
+ @BeforeEach
+ void setServer() throws IOException {
+ mockWebServer = new MockWebServer();
+ mockWebServer.start();
+ baseUrl = LOCALHOST_URL_FORMAT.formatted(mockWebServer.getPort());
+ }
+
+ @AfterEach
+ void closeServer() {
+ if (fileIO != null) {
+ fileIO.close();
+ }
+ mockWebServer.close();
+ }
+
+ @Test
+ void testInitializeProperties() {
+ fileIO = new GoogleCloudStorageFileIO();
+ final Map<String, String> props = Map.of(
+ OAUTH2_TOKEN.getProperty(), TEST_TOKEN,
+ SERVICE_HOST.getProperty(), baseUrl
+ );
+ fileIO.initialize(props);
+
+ assertEquals(props, fileIO.properties());
+ }
+
+ @Test
+ void testDeleteFile() throws InterruptedException {
+ mockWebServer.enqueue(new MockResponse.Builder()
+ .code(HTTP_NO_CONTENT)
+ .build());
+
+ fileIO = createFileIO(TEST_TOKEN);
+ fileIO.deleteFile(BUCKET_PATH_FILE_URI);
+
+ final RecordedRequest request = mockWebServer.takeRequest();
+ assertEquals(DELETE_METHOD, request.getMethod());
+ assertEquals(BEARER_TEST_TOKEN,
request.getHeaders().get(AUTHORIZATION.getHeader()));
+ }
+
+ @Test
+ void testDeleteFileNotFoundIgnored() {
+ mockWebServer.enqueue(new MockResponse.Builder()
+ .code(HTTP_NOT_FOUND)
+ .build());
+
+ fileIO = createFileIO(null);
+ fileIO.deleteFile(BUCKET_MISSING_FILE_URI);
+ }
+
+ @Test
+ void testDeleteFileFailedForbidden() {
+ mockWebServer.enqueue(new MockResponse.Builder()
+ .code(HTTP_FORBIDDEN)
+ .build());
+
+ fileIO = createFileIO(null);
+ assertThrows(HttpResponseException.class, () ->
fileIO.deleteFile(BUCKET_PATH_FILE_URI));
+ }
+
+ @Test
+ void testDeleteFileRequestFailed() {
+ mockWebServer.close();
+
+ fileIO = createFileIO(null);
+ assertThrows(HttpRequestException.class, () ->
fileIO.deleteFile(BUCKET_PATH_FILE_URI));
+ }
+
+ @Test
+ void testInputFileExists() {
+ mockWebServer.enqueue(new MockResponse.Builder()
+ .code(HTTP_OK)
+ .build());
+
+ fileIO = createFileIO(TOKEN);
+ assertTrue(fileIO.newInputFile(BUCKET_EXISTS_URI).exists());
+ }
+
+ @Test
+ void testInputFileNotExists() {
+ mockWebServer.enqueue(new MockResponse.Builder()
+ .code(HTTP_NOT_FOUND)
+ .build());
+
+ fileIO = createFileIO(null);
+ assertFalse(fileIO.newInputFile(BUCKET_MISSING_URI).exists());
+ }
+
+ @Test
+ void testInputFileGetLength() {
+ mockWebServer.enqueue(new MockResponse.Builder()
+ .code(HTTP_OK)
+ .body(SIZE_RESPONSE_12345)
+ .build());
+
+ fileIO = createFileIO(TOKEN);
+ assertEquals(EXPECTED_LENGTH,
fileIO.newInputFile(BUCKET_FILE_PARQUET_URI).getLength());
+ }
+
+ @Test
+ void testInputFileKnownLength() {
+ fileIO = createFileIO(null);
+
+ final InputFile input = fileIO.newInputFile(BUCKET_FILE_PARQUET_URI,
KNOWN_LENGTH);
+ assertEquals(KNOWN_LENGTH, input.getLength());
+ }
+
+ @Test
+ void testOutputFileCreateAndWrite() throws IOException,
InterruptedException {
+ final String sessionUrl = baseUrl + UPLOAD_SESSION_PATH;
+
+ mockWebServer.enqueue(new MockResponse.Builder()
+ .code(HTTP_OK)
+ .addHeader(LOCATION.getHeader(), sessionUrl)
+ .build());
+
+ mockWebServer.enqueue(new MockResponse.Builder()
+ .code(HTTP_OK)
+ .build());
+
+ fileIO = createFileIO(TOKEN, Map.of(
+ WRITE_CHUNK_SIZE_BYTES.getProperty(),
String.valueOf(GoogleCloudStorageProperties.DEFAULT_WRITE_CHUNK_SIZE)
+ ));
+
+ final OutputFile outputFile = fileIO.newOutputFile(BUCKET_OUTPUT_URI);
+ assertNotNull(outputFile.location());
+
+ final byte[] data = HELLO_WORLD.getBytes(StandardCharsets.UTF_8);
+ try (PositionOutputStream out = outputFile.createOrOverwrite()) {
+ out.write(data);
+ assertEquals(data.length, out.getPos());
+ }
+
+ final RecordedRequest initRequest = mockWebServer.takeRequest();
+ assertEquals(POST_METHOD, initRequest.getMethod());
+
+ final RecordedRequest uploadRequest = mockWebServer.takeRequest();
+ assertEquals(PUT_METHOD, uploadRequest.getMethod());
+
+ final ByteString requestBody = uploadRequest.getBody();
+ assertNotNull(requestBody);
+ assertArrayEquals(data, requestBody.toByteArray());
+ }
+
+ @Test
+ void testReadClose() throws IOException {
+ final byte[] content = READ_CONTENT.getBytes(StandardCharsets.UTF_8);
+
+ mockWebServer.enqueue(new MockResponse.Builder()
+ .code(HTTP_OK)
+ .body(SIZE_RESPONSE_FORMAT.formatted(content.length))
+ .build());
+
+ mockWebServer.enqueue(new MockResponse.Builder()
+ .code(HTTP_PARTIAL)
+ .addHeader(CONTENT_RANGE.getHeader(),
CONTENT_RANGE_FORMAT.formatted(0, content.length - 1, content.length))
+ .body(READ_CONTENT)
+ .build());
+
+ fileIO = createFileIO(TOKEN);
+
+ final InputFile inputFile = fileIO.newInputFile(BUCKET_FILE_TXT_URI);
+ assertEquals(content.length, inputFile.getLength());
+
+ try (SeekableInputStream stream = inputFile.newStream()) {
+ final byte[] result = new byte[content.length];
+ int totalRead = 0;
+ while (totalRead < content.length) {
+ final int n = stream.read(result, totalRead, content.length -
totalRead);
+ if (n < 0) {
+ break;
+ }
+ totalRead += n;
+ }
+ assertEquals(content.length, totalRead);
+ assertArrayEquals(content, result);
+ assertEquals(content.length, stream.getPos());
+ }
+ }
+
+ @Test
+ void testSeekableInputStreamSeek() throws IOException {
+ final byte[] content = ALPHABET.getBytes(StandardCharsets.UTF_8);
+
+ mockWebServer.enqueue(new MockResponse.Builder()
+ .code(HTTP_PARTIAL)
+ .addHeader(CONTENT_RANGE.getHeader(),
CONTENT_RANGE_FORMAT.formatted(10, 19, content.length))
+ .body(ALPHABET.substring(10, 20))
+ .build());
+
+ mockWebServer.enqueue(new MockResponse.Builder()
+ .code(HTTP_PARTIAL)
+ .addHeader(CONTENT_RANGE.getHeader(),
CONTENT_RANGE_FORMAT.formatted(25, 25, content.length))
+ .body(ALPHABET.substring(25, 26))
+ .build());
+
+ fileIO = createFileIO(TOKEN, Map.of(
+ READ_CHUNK_SIZE_BYTES.getProperty(), READ_CHUNK_SIZE_10
+ ));
+
+ try (SeekableInputStream stream =
fileIO.newInputFile(BUCKET_FILE_TXT_URI, content.length).newStream()) {
+ assertEquals(0, stream.getPos());
+
+ stream.seek(10);
+ assertEquals(10, stream.getPos());
+ assertEquals('K', stream.read());
+ assertEquals(11, stream.getPos());
+
+ stream.seek(25);
+ assertEquals('Z', stream.read());
+ assertEquals(-1, stream.read());
+ }
+ }
+
+ @Test
+ void testUserProjectQueryParam() {
+ final GoogleCloudStorageProperties storageProperties = new
GoogleCloudStorageProperties(Map.of(
+ SERVICE_HOST.getProperty(), baseUrl,
+ USER_PROJECT.getProperty(), MY_PROJECT
+ ));
+
+ final GoogleCloudStorageLocation loc =
GoogleCloudStorageLocation.parse(BUCKET_KEY_URI);
+
assertTrue(storageProperties.metadataUri(loc).contains(USER_PROJECT_QUERY));
+
assertTrue(storageProperties.uploadUri(loc).contains(USER_PROJECT_QUERY));
+
assertTrue(storageProperties.downloadUri(loc).contains(USER_PROJECT_QUERY));
+ }
+
+ private GoogleCloudStorageFileIO createFileIO(final String token) {
+ return createFileIO(token, Map.of());
+ }
+
+ private GoogleCloudStorageFileIO createFileIO(final String token, final
Map<String, String> extraProps) {
+ final GoogleCloudStorageFileIO io = new GoogleCloudStorageFileIO();
+ final Map<String, String> props = new HashMap<>();
+ props.put(SERVICE_HOST.getProperty(), baseUrl);
+ if (token != null) {
+ props.put(OAUTH2_TOKEN.getProperty(), token);
+ }
+ props.putAll(extraProps);
+ io.initialize(props);
+ return io;
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/test/java/org/apache/nifi/services/iceberg/gcs/GoogleCloudStorageLocationTest.java
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/test/java/org/apache/nifi/services/iceberg/gcs/GoogleCloudStorageLocationTest.java
new file mode 100644
index 00000000000..2eb1cb63b10
--- /dev/null
+++
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/test/java/org/apache/nifi/services/iceberg/gcs/GoogleCloudStorageLocationTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg.gcs;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+class GoogleCloudStorageLocationTest {
+
+ private static final String OBJECT_BUCKET = "object-bucket";
+ private static final String SIMPLE_KEY = "path/to/object.parquet";
+ private static final String OBJECT_URI =
"gs://%s/%s".formatted(OBJECT_BUCKET, SIMPLE_KEY);
+ private static final String BUCKET = "bucket";
+ private static final String OBJECT_PARQUET = "object.parquet";
+ private static final String SINGLE_SEGMENT_URI =
"gs://%s/%s".formatted(BUCKET, OBJECT_PARQUET);
+ private static final String DEEP_KEY = "db/table/data/00000-0-abc.parquet";
+ private static final String DEEP_PATH_URI =
"gs://bucket/%s".formatted(DEEP_KEY);
+
+ private static final String MULTI_SEGMENT_URI = "gs://bucket/a/b/c";
+ private static final String SPACES_URI = "gs://bucket/path with
spaces/file name.txt";
+ private static final String ENCODED_MULTI_SEGMENT = "a%2Fb%2Fc";
+ private static final String ENCODED_SPACES =
"path%20with%20spaces%2Ffile%20name.txt";
+
+ private static final String SIMPLE_BUCKET_KEY_URI =
"gs://bucket/objectKey";
+ private static final String INVALID_SCHEME_URI = "s3://bucket/objectKey";
+ private static final String NO_BUCKET_URI = "gs:///objectKey";
+ private static final String NO_KEY_URI = "gs://bucket/";
+ private static final String NO_SLASH_URI = "gs://bucket";
+
+ @Test
+ void testParseSimple() {
+ final GoogleCloudStorageLocation location =
GoogleCloudStorageLocation.parse(OBJECT_URI);
+ assertEquals(OBJECT_BUCKET, location.bucket());
+ assertEquals(SIMPLE_KEY, location.objectKey());
+ }
+
+ @Test
+ void testParseSingleSegmentKey() {
+ final GoogleCloudStorageLocation location =
GoogleCloudStorageLocation.parse(SINGLE_SEGMENT_URI);
+ assertEquals(BUCKET, location.bucket());
+ assertEquals(OBJECT_PARQUET, location.objectKey());
+ }
+
+ @Test
+ void testParseDeepPath() {
+ final GoogleCloudStorageLocation location =
GoogleCloudStorageLocation.parse(DEEP_PATH_URI);
+ assertEquals(BUCKET, location.bucket());
+ assertEquals(DEEP_KEY, location.objectKey());
+ }
+
+ @Test
+ void testEncodedKeyEncodesSlashes() {
+ final GoogleCloudStorageLocation location =
GoogleCloudStorageLocation.parse(MULTI_SEGMENT_URI);
+ assertEquals(ENCODED_MULTI_SEGMENT, location.encodedObjectKey());
+ }
+
+ @Test
+ void testEncodedKeyEncodesSpaces() {
+ final GoogleCloudStorageLocation location =
GoogleCloudStorageLocation.parse(SPACES_URI);
+ assertEquals(ENCODED_SPACES, location.encodedObjectKey());
+ }
+
+ @Test
+ void testToString() {
+ final GoogleCloudStorageLocation location =
GoogleCloudStorageLocation.parse(SIMPLE_BUCKET_KEY_URI);
+ assertEquals(SIMPLE_BUCKET_KEY_URI, location.toString());
+ }
+
+ @Test
+ void testParseNullThrows() {
+ assertThrows(IllegalArgumentException.class, () ->
GoogleCloudStorageLocation.parse(null));
+ }
+
+ @Test
+ void testParseInvalidSchemeThrows() {
+ assertThrows(IllegalArgumentException.class, () ->
GoogleCloudStorageLocation.parse(INVALID_SCHEME_URI));
+ }
+
+ @Test
+ void testParseNoBucketThrows() {
+ assertThrows(IllegalArgumentException.class, () ->
GoogleCloudStorageLocation.parse(NO_BUCKET_URI));
+ }
+
+ @Test
+ void testParseNoKeyThrows() {
+ assertThrows(IllegalArgumentException.class, () ->
GoogleCloudStorageLocation.parse(NO_KEY_URI));
+ }
+
+ @Test
+ void testParseNoSlashThrows() {
+ assertThrows(IllegalArgumentException.class, () ->
GoogleCloudStorageLocation.parse(NO_SLASH_URI));
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/test/java/org/apache/nifi/services/iceberg/gcs/HttpClientProviderTest.java
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/test/java/org/apache/nifi/services/iceberg/gcs/HttpClientProviderTest.java
new file mode 100644
index 00000000000..4479a308bdf
--- /dev/null
+++
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/test/java/org/apache/nifi/services/iceberg/gcs/HttpClientProviderTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg.gcs;
+
+import mockwebserver3.MockResponse;
+import mockwebserver3.MockWebServer;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.io.IOException;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.time.Duration;
+
+import static java.net.HttpURLConnection.HTTP_BAD_GATEWAY;
+import static java.net.HttpURLConnection.HTTP_CLIENT_TIMEOUT;
+import static java.net.HttpURLConnection.HTTP_FORBIDDEN;
+import static java.net.HttpURLConnection.HTTP_GATEWAY_TIMEOUT;
+import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR;
+import static java.net.HttpURLConnection.HTTP_OK;
+import static java.net.HttpURLConnection.HTTP_UNAVAILABLE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+class HttpClientProviderTest {
+
+ private static final int HTTP_TOO_MANY_REQUESTS = 429;
+
+ private static final Duration TEST_BACKOFF = Duration.ofMillis(10);
+
+ private static final String LOCALHOST_URI_FORMAT =
"http://localhost:%d/test";
+
+ private static final String GET_METHOD = "GET";
+
+ private static final int EXPECTED_REQUESTS_WITH_RETRY = 2;
+
+ private static final int EXPECTED_REQUESTS_RETRIES_EXHAUSTED = 3;
+
+ private MockWebServer mockWebServer;
+
+ private HttpClientProvider httpClientProvider;
+
+ @BeforeEach
+ void startServer() throws IOException {
+ mockWebServer = new MockWebServer();
+ mockWebServer.start();
+ httpClientProvider = new HttpClientProvider(null, TEST_BACKOFF);
+ }
+
+ @AfterEach
+ void closeServer() {
+ httpClientProvider.close();
+ mockWebServer.close();
+ }
+
+ @Test
+ void testSendSuccess() throws IOException {
+ mockWebServer.enqueue(new MockResponse.Builder()
+ .code(HTTP_OK)
+ .build());
+
+ final HttpResponse<String> response = sendGetRequest();
+
+ assertEquals(HTTP_OK, response.statusCode());
+ assertEquals(1, mockWebServer.getRequestCount());
+ }
+
+ @ParameterizedTest
+ @ValueSource(ints = {HTTP_CLIENT_TIMEOUT, HTTP_TOO_MANY_REQUESTS,
HTTP_INTERNAL_ERROR, HTTP_BAD_GATEWAY, HTTP_UNAVAILABLE, HTTP_GATEWAY_TIMEOUT})
+ void testSendRetriableStatusCodeRetried(final int statusCode) throws
IOException {
+ mockWebServer.enqueue(new MockResponse.Builder()
+ .code(statusCode)
+ .build());
+ mockWebServer.enqueue(new MockResponse.Builder()
+ .code(HTTP_OK)
+ .build());
+
+ final HttpResponse<String> response = sendGetRequest();
+
+ assertEquals(HTTP_OK, response.statusCode());
+ assertEquals(EXPECTED_REQUESTS_WITH_RETRY,
mockWebServer.getRequestCount());
+ }
+
+ @Test
+ void testSendForbiddenStatusCodeNotRetried() throws IOException {
+ mockWebServer.enqueue(new MockResponse.Builder()
+ .code(HTTP_FORBIDDEN)
+ .build());
+
+ final HttpResponse<String> response = sendGetRequest();
+
+ assertEquals(HTTP_FORBIDDEN, response.statusCode());
+ assertEquals(1, mockWebServer.getRequestCount());
+ }
+
+ @Test
+ void testSendRetriesExhaustedReturnsLastResponse() throws IOException {
+ for (int i = 0; i < EXPECTED_REQUESTS_RETRIES_EXHAUSTED; i++) {
+ mockWebServer.enqueue(new MockResponse.Builder()
+ .code(HTTP_UNAVAILABLE)
+ .build());
+ }
+
+ final HttpResponse<String> response = sendGetRequest();
+
+ assertEquals(HTTP_UNAVAILABLE, response.statusCode());
+ assertEquals(EXPECTED_REQUESTS_RETRIES_EXHAUSTED,
mockWebServer.getRequestCount());
+ }
+
+ @Test
+ void testSendException() {
+ mockWebServer.close();
+
+ assertThrows(IOException.class, this::sendGetRequest);
+ }
+
+ private HttpResponse<String> sendGetRequest() throws IOException {
+ final String uri =
LOCALHOST_URI_FORMAT.formatted(mockWebServer.getPort());
+ final HttpRequest request = httpClientProvider.newRequestBuilder(uri)
+ .method(GET_METHOD, HttpRequest.BodyPublishers.noBody())
+ .build();
+ return httpClientProvider.send(request,
HttpResponse.BodyHandlers.ofString());
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/test/java/org/apache/nifi/services/iceberg/gcs/KeyDigestProviderTest.java
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/test/java/org/apache/nifi/services/iceberg/gcs/KeyDigestProviderTest.java
new file mode 100644
index 00000000000..4779fae7efe
--- /dev/null
+++
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/test/java/org/apache/nifi/services/iceberg/gcs/KeyDigestProviderTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg.gcs;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class KeyDigestProviderTest {
+ private static final String INPUT_KEY = "0102030405060708";
+
+ private static final String DIGEST_EXPECTED =
"Kt9FTK2ulLNLyGIXrZ0ZGAUKQwOQNpffViNE76oTgp0=";
+
+ @Test
+ void testGetDigestEncoded() {
+ final String digestEncoded =
KeyDigestProvider.getDigestEncoded(INPUT_KEY);
+
+ assertEquals(DIGEST_EXPECTED, digestEncoded);
+ }
+}
diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/pom.xml
b/nifi-extension-bundles/nifi-iceberg-bundle/pom.xml
index 4c34589558b..518309c079c 100644
--- a/nifi-extension-bundles/nifi-iceberg-bundle/pom.xml
+++ b/nifi-extension-bundles/nifi-iceberg-bundle/pom.xml
@@ -33,6 +33,8 @@
<module>nifi-iceberg-aws-nar</module>
<module>nifi-iceberg-azure</module>
<module>nifi-iceberg-azure-nar</module>
+ <module>nifi-iceberg-gcs</module>
+ <module>nifi-iceberg-gcs-nar</module>
<module>nifi-iceberg-parquet-writer</module>
<module>nifi-iceberg-parquet-writer-nar</module>
<module>nifi-iceberg-processors</module>