This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new fb0beb0a3ff NIFI-15754 Add Google Cloud Storage Provider for Iceberg 
(#11052)
fb0beb0a3ff is described below

commit fb0beb0a3ff4ccc0adabd0a601e5f47e1095d063
Author: David Handermann <[email protected]>
AuthorDate: Mon Mar 30 16:02:53 2026 -0500

    NIFI-15754 Add Google Cloud Storage Provider for Iceberg (#11052)
    
    * NIFI-15754 Added Google Cloud Storage Provider for Iceberg
    
    - Added Iceberg GCS module and NAR
    - Added Iceberg FileIO implementation using Java HttpClient for GCS REST 
operations
    - Added retry strategy to HttpClientProvider
---
 nifi-assembly/pom.xml                              |   6 +
 .../nifi-iceberg-gcs-nar/pom.xml                   |  74 +++++
 .../src/main/resources/META-INF/LICENSE            | 209 +++++++++++++
 .../nifi-iceberg-bundle/nifi-iceberg-gcs/pom.xml   |  55 ++++
 .../iceberg/gcs/AuthenticationStrategy.java        |  50 +++
 .../iceberg/gcs/GCSIcebergFileIOProvider.java      |  62 ++++
 .../iceberg/gcs/GoogleCloudStorageFileIO.java      | 145 +++++++++
 .../iceberg/gcs/GoogleCloudStorageHeader.java      |  56 ++++
 .../iceberg/gcs/GoogleCloudStorageInputFile.java   | 169 ++++++++++
 .../iceberg/gcs/GoogleCloudStorageLocation.java    |  81 +++++
 .../iceberg/gcs/GoogleCloudStorageOutputFile.java  |  98 ++++++
 .../GoogleCloudStoragePositionOutputStream.java    | 225 ++++++++++++++
 .../iceberg/gcs/GoogleCloudStorageProperties.java  | 140 +++++++++
 .../iceberg/gcs/GoogleCloudStorageProperty.java    |  47 +++
 .../gcs/GoogleCloudStorageSeekableInputStream.java | 284 +++++++++++++++++
 .../services/iceberg/gcs/HttpClientProvider.java   | 154 ++++++++++
 .../services/iceberg/gcs/HttpRequestException.java |  27 ++
 .../iceberg/gcs/HttpResponseException.java         |  27 ++
 .../services/iceberg/gcs/KeyDigestProvider.java    |  43 +++
 .../org.apache.nifi.controller.ControllerService   |  15 +
 .../iceberg/gcs/GCSIcebergFileIOProviderTest.java  |  89 ++++++
 .../iceberg/gcs/GoogleCloudStorageFileIOTest.java  | 341 +++++++++++++++++++++
 .../gcs/GoogleCloudStorageLocationTest.java        | 109 +++++++
 .../iceberg/gcs/HttpClientProviderTest.java        | 141 +++++++++
 .../iceberg/gcs/KeyDigestProviderTest.java         |  34 ++
 nifi-extension-bundles/nifi-iceberg-bundle/pom.xml |   2 +
 26 files changed, 2683 insertions(+)

diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml
index def628fa54e..eb0d6e21918 100644
--- a/nifi-assembly/pom.xml
+++ b/nifi-assembly/pom.xml
@@ -885,6 +885,12 @@ language governing permissions and limitations under the 
License. -->
             <version>2.9.0-SNAPSHOT</version>
             <type>nar</type>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-iceberg-gcs-nar</artifactId>
+            <version>2.9.0-SNAPSHOT</version>
+            <type>nar</type>
+        </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-iceberg-parquet-writer-nar</artifactId>
diff --git 
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs-nar/pom.xml 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs-nar/pom.xml
new file mode 100644
index 00000000000..2de9b861b8a
--- /dev/null
+++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs-nar/pom.xml
@@ -0,0 +1,74 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
https://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-iceberg-bundle</artifactId>
+        <version>2.9.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-iceberg-gcs-nar</artifactId>
+    <packaging>nar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-iceberg-gcs</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-iceberg-service-api-nar</artifactId>
+            <version>${project.version}</version>
+            <type>nar</type>
+        </dependency>
+    </dependencies>
+
+    <dependencyManagement>
+        <dependencies>
+            <!-- Provided in shared NAR -->
+            <dependency>
+                <groupId>org.apache.iceberg</groupId>
+                <artifactId>iceberg-api</artifactId>
+                <scope>provided</scope>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.iceberg</groupId>
+                <artifactId>iceberg-core</artifactId>
+                <scope>provided</scope>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.iceberg</groupId>
+                <artifactId>iceberg-common</artifactId>
+                <scope>provided</scope>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.iceberg</groupId>
+                <artifactId>iceberg-bundled-guava</artifactId>
+                <scope>provided</scope>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.avro</groupId>
+                <artifactId>avro</artifactId>
+                <scope>provided</scope>
+            </dependency>
+        </dependencies>
+    </dependencyManagement>
+</project>
diff --git 
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs-nar/src/main/resources/META-INF/LICENSE
 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs-nar/src/main/resources/META-INF/LICENSE
new file mode 100644
index 00000000000..44893cdb29d
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs-nar/src/main/resources/META-INF/LICENSE
@@ -0,0 +1,209 @@
+
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright [yyyy] [name of copyright owner]
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+
+APACHE NIFI SUBCOMPONENTS:
+
+The Apache NiFi project contains subcomponents with separate copyright
+notices and license terms. Your use of the source code for the these
+subcomponents is subject to the terms and conditions of the following
+licenses. 
diff --git 
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/pom.xml 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/pom.xml
new file mode 100644
index 00000000000..d830b801552
--- /dev/null
+++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/pom.xml
@@ -0,0 +1,55 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-iceberg-bundle</artifactId>
+        <version>2.9.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-iceberg-gcs</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-iceberg-service-api</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <!-- Provided in shared NAR -->
+        <dependency>
+            <groupId>org.apache.iceberg</groupId>
+            <artifactId>iceberg-api</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <!-- Test Dependencies -->
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.squareup.okhttp3</groupId>
+            <artifactId>mockwebserver3</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>
diff --git 
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/AuthenticationStrategy.java
 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/AuthenticationStrategy.java
new file mode 100644
index 00000000000..97b8e936dd7
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/AuthenticationStrategy.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg.gcs;
+
+import org.apache.nifi.components.DescribedValue;
+
+/**
+ * Enumeration of supported Authentication Types for Google Cloud Storage 
access
+ */
+public enum AuthenticationStrategy implements DescribedValue {
+    VENDED_CREDENTIALS("Vended Credentials", "Authentication using credentials 
supplied from Iceberg Catalog");
+
+    private final String displayName;
+
+    private final String description;
+
+    AuthenticationStrategy(final String displayName, final String description) 
{
+        this.displayName = displayName;
+        this.description = description;
+    }
+
+    @Override
+    public String getValue() {
+        return name();
+    }
+
+    @Override
+    public String getDisplayName() {
+        return displayName;
+    }
+
+    @Override
+    public String getDescription() {
+        return description;
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/GCSIcebergFileIOProvider.java
 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/GCSIcebergFileIOProvider.java
new file mode 100644
index 00000000000..7a5c5a648b6
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/GCSIcebergFileIOProvider.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg.gcs;
+
+import org.apache.iceberg.io.FileIO;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.services.iceberg.IcebergFileIOProvider;
+import org.apache.nifi.services.iceberg.ProviderContext;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+@Tags({"google", "cloud", "storage", "iceberg", "gcp"})
+@CapabilityDescription("Provides Google Cloud Storage file input and output 
support for Apache Iceberg tables")
+public class GCSIcebergFileIOProvider extends AbstractControllerService 
implements IcebergFileIOProvider {
+
+    static final PropertyDescriptor AUTHENTICATION_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("Authentication Strategy")
+            .description("Strategy for authenticating with Google Cloud 
Storage services")
+            .required(true)
+            .allowableValues(AuthenticationStrategy.class)
+            .defaultValue(AuthenticationStrategy.VENDED_CREDENTIALS)
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = 
List.of(
+            AUTHENTICATION_STRATEGY
+    );
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTY_DESCRIPTORS;
+    }
+
+    @Override
+    public FileIO getFileIO(final ProviderContext providerContext) {
+        Objects.requireNonNull(providerContext, "Provider Context required");
+        final Map<String, String> contextProperties = 
providerContext.getProperties();
+        Objects.requireNonNull(contextProperties, "Context properties 
required");
+
+        final GoogleCloudStorageFileIO fileIO = new GoogleCloudStorageFileIO();
+        fileIO.initialize(contextProperties);
+        return fileIO;
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/GoogleCloudStorageFileIO.java
 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/GoogleCloudStorageFileIO.java
new file mode 100644
index 00000000000..1771b021a24
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/GoogleCloudStorageFileIO.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg.gcs;
+
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serial;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.util.Collections;
+import java.util.Map;
+
+import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
+import static java.net.HttpURLConnection.HTTP_NO_CONTENT;
+import static 
org.apache.nifi.services.iceberg.gcs.GoogleCloudStorageProperty.OAUTH2_TOKEN;
+
+/**
+ * Google Cloud Storage implementation of Iceberg FileIO using Java HttpClient 
for REST API operations
+ */
+class GoogleCloudStorageFileIO implements FileIO {
+
+    @Serial
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger logger = 
LoggerFactory.getLogger(GoogleCloudStorageFileIO.class);
+
+    private Map<String, String> properties = Collections.emptyMap();
+
+    private transient GoogleCloudStorageProperties storageProperties;
+    private transient HttpClientProvider httpClientProvider;
+
+    /**
+     * Initialize FileIO with standard properties defined in Apache Iceberg 
GCPProperties class
+     *
+     * @param properties Properties defined according to Iceberg GCPProperties
+     */
+    @Override
+    public void initialize(final Map<String, String> properties) {
+        this.properties = Map.copyOf(properties);
+        this.storageProperties = new GoogleCloudStorageProperties(properties);
+
+        final String bearerToken = properties.get(OAUTH2_TOKEN.getProperty());
+        this.httpClientProvider = new HttpClientProvider(bearerToken);
+    }
+
+    /**
+     * Create Iceberg Input File with unspecified length
+     *
+     * @param path Input File Path
+     * @return Input File with unspecified length
+     */
+    @Override
+    public InputFile newInputFile(final String path) {
+        return new GoogleCloudStorageInputFile(httpClientProvider, 
storageProperties, GoogleCloudStorageLocation.parse(path), path, null);
+    }
+
+    /**
+     * Create Iceberg Input File with length specified
+     *
+     * @param path Input File Path
+     * @param length Input File Length in bytes
+     * @return Input File with length specified
+     */
+    @Override
+    public InputFile newInputFile(final String path, final long length) {
+        return new GoogleCloudStorageInputFile(httpClientProvider, 
storageProperties, GoogleCloudStorageLocation.parse(path), path, length);
+    }
+
+    /**
+     * Create Iceberg Output File
+     *
+     * @param path Output File Path
+     * @return Output File
+     */
+    @Override
+    public OutputFile newOutputFile(final String path) {
+        return new GoogleCloudStorageOutputFile(httpClientProvider, 
storageProperties, GoogleCloudStorageLocation.parse(path), path);
+    }
+
+    /**
+     * Delete File at specified location
+     *
+     * @param path Location of file to be deleted
+     */
+    @Override
+    public void deleteFile(final String path) {
+        final GoogleCloudStorageLocation location = 
GoogleCloudStorageLocation.parse(path);
+        final String uri = storageProperties.metadataUri(location);
+        final HttpRequest request = 
httpClientProvider.newRequestBuilder(uri).DELETE().build();
+        try {
+            final HttpResponse<String> response = 
httpClientProvider.send(request, HttpResponse.BodyHandlers.ofString());
+            final int statusCode = response.statusCode();
+
+            if (HTTP_NO_CONTENT == statusCode || HTTP_NOT_FOUND == statusCode) 
{
+                logger.debug("Delete File [{}] completed: HTTP {}", path, 
statusCode);
+            } else {
+                final String responseBody = response.body();
+                throw new HttpResponseException("Delete File [%s] failed: HTTP 
%d [%s]".formatted(path, statusCode, responseBody));
+            }
+        } catch (final IOException e) {
+            throw new HttpRequestException("Delete File [%s] 
failed".formatted(path), e);
+        }
+    }
+
+    /**
+     * Get current configuration properties
+     *
+     * @return Configuration properties
+     */
+    @Override
+    public Map<String, String> properties() {
+        return properties;
+    }
+
+    /**
+     * Close client resources
+     */
+    @Override
+    public void close() {
+        if (httpClientProvider == null) {
+            logger.warn("Closed before initialized");
+        } else {
+            httpClientProvider.close();
+        }
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/GoogleCloudStorageHeader.java
 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/GoogleCloudStorageHeader.java
new file mode 100644
index 00000000000..a6ab5f53fff
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/GoogleCloudStorageHeader.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg.gcs;
+
+/**
+ * HTTP header names used in Google Cloud Storage JSON API requests and 
responses
+ */
+enum GoogleCloudStorageHeader {
+    /** RFC 9110 Section 11.6.2 */
+    AUTHORIZATION("Authorization"),
+
+    /** RFC 9110 Section 14.4 */
+    CONTENT_RANGE("Content-Range"),
+
+    /** Google Header for customer-supplied encryption keys */
+    ENCRYPTION_ALGORITHM("x-goog-encryption-algorithm"),
+
+    /** Google Header for customer-supplied encryption keys containing 
Base64-encoded values */
+    ENCRYPTION_KEY("x-goog-encryption-key"),
+
+    /** Google Header for customer-supplied encryption keys containing 
Base64-encoded values */
+    ENCRYPTION_KEY_SHA256("x-goog-encryption-key-sha256"),
+
+    /** RFC 9110 Section 10.2.2 */
+    LOCATION("Location"),
+
+    /** RFC 9110 Section 14.2 */
+    RANGE("Range"),
+
+    /** Content Type for payload of Google Cloud Storage resumable uploads */
+    UPLOAD_CONTENT_TYPE("X-Upload-Content-Type");
+
+    private final String header;
+
+    GoogleCloudStorageHeader(final String header) {
+        this.header = header;
+    }
+
+    String getHeader() {
+        return header;
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/GoogleCloudStorageInputFile.java
 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/GoogleCloudStorageInputFile.java
new file mode 100644
index 00000000000..1641c8e601f
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/GoogleCloudStorageInputFile.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg.gcs;
+
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.SeekableInputStream;
+
+import java.io.IOException;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
+import static java.net.HttpURLConnection.HTTP_OK;
+
+/**
+ * Google Cloud Storage implementation of Apache Iceberg InputFile using 
HttpClient for REST operations
+ */
+class GoogleCloudStorageInputFile implements InputFile {
+
+    private static final String QUESTION_MARK = "?";
+    private static final char AMPERSAND = '&';
+    private static final String FIELDS_SIZE_QUERY = "fields=size";
+
+    private static final Pattern SIZE_PATTERN = 
Pattern.compile("\"size\"\\s*:\\s*\"(\\d+)\"");
+    private static final int SIZE_GROUP = 1;
+
+    private final HttpClientProvider httpClientProvider;
+
+    private final GoogleCloudStorageProperties storageProperties;
+
+    private final GoogleCloudStorageLocation location;
+
+    private final String path;
+
+    private volatile Long cachedLength;
+
+    GoogleCloudStorageInputFile(
+            final HttpClientProvider httpClientProvider,
+            final GoogleCloudStorageProperties storageProperties,
+            final GoogleCloudStorageLocation location,
+            final String path,
+            final Long cachedLength
+    ) {
+        this.httpClientProvider = httpClientProvider;
+        this.storageProperties = storageProperties;
+        this.location = location;
+        this.path = path;
+        this.cachedLength = cachedLength;
+    }
+
+    /**
+     * Get Input File length in bytes and fetch remote object size when length 
is not cached
+     *
+     * @return Input File length in bytes
+     */
+    @Override
+    public long getLength() {
+        if (cachedLength == null) {
+            cachedLength = getObjectSize();
+        }
+        return cachedLength;
+    }
+
+    /**
+     * Create Seekable InputStream for InputFile reading
+     *
+     * @return Seekable InputStream
+     */
+    @Override
+    public SeekableInputStream newStream() {
+        return new GoogleCloudStorageSeekableInputStream(httpClientProvider, 
storageProperties, location, cachedLength);
+    }
+
+    /**
+     * Get InputFile Location
+     *
+     * @return InputFile Location
+     */
+    @Override
+    public String location() {
+        return path;
+    }
+
+    /**
+     * Get status of InputFile existence based on Metadata URI
+     *
+     * @return InputFile existence status
+     */
+    @Override
+    public boolean exists() {
+        final String uri = getMetadataUri();
+        final HttpRequest request = 
httpClientProvider.newRequestBuilder(uri).GET().build();
+
+        try {
+            final HttpResponse<Void> response = 
httpClientProvider.send(request, HttpResponse.BodyHandlers.discarding());
+            final int statusCode = response.statusCode();
+
+            final boolean exists;
+            if (statusCode == HTTP_OK) {
+                exists = true;
+            } else if (statusCode == HTTP_NOT_FOUND) {
+                exists = false;
+            } else {
+                throw new HttpResponseException("Metadata URI [%s] request 
failed: HTTP %d".formatted(uri, statusCode));
+            }
+
+            return exists;
+        } catch (final IOException e) {
+            throw new HttpRequestException("Metadata URI [%s] request 
failed".formatted(uri), e);
+        }
+    }
+
+    private long getObjectSize() {
+        final String uri = getMetadataUri();
+        final HttpRequest request = 
httpClientProvider.newRequestBuilder(uri).GET().build();
+
+        try {
+            final HttpResponse<String> response = 
httpClientProvider.send(request, HttpResponse.BodyHandlers.ofString());
+            final int statusCode = response.statusCode();
+            final String responseBody = response.body();
+
+            if (HTTP_OK == statusCode) {
+                final Matcher sizeMatcher = SIZE_PATTERN.matcher(responseBody);
+                if (sizeMatcher.find()) {
+                    final String sizeGroup = sizeMatcher.group(SIZE_GROUP);
+                    return Long.parseLong(sizeGroup);
+                } else {
+                    throw new HttpResponseException("Metadata URI [%s] 
response parsing failed: HTTP %d size not found [%s]".formatted(uri, 
statusCode, responseBody));
+                }
+            } else {
+                throw new HttpResponseException("Metadata URI [%s] request 
failed: HTTP %d [%s]".formatted(uri, statusCode, responseBody));
+            }
+        } catch (final IOException e) {
+            throw new HttpRequestException("Metadata URI [%s] request 
failed".formatted(uri), e);
+        }
+    }
+
+    private String getMetadataUri() {
+        final String baseUri = storageProperties.metadataUri(location);
+
+        final StringBuilder builder = new StringBuilder(baseUri);
+
+        if (baseUri.contains(QUESTION_MARK)) {
+            builder.append(AMPERSAND);
+        } else {
+            builder.append(QUESTION_MARK);
+        }
+
+        builder.append(FIELDS_SIZE_QUERY);
+
+        return builder.toString();
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/GoogleCloudStorageLocation.java
 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/GoogleCloudStorageLocation.java
new file mode 100644
index 00000000000..2d617a4b6bd
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/GoogleCloudStorageLocation.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg.gcs;
+
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * Parsed representation of Google Cloud Storage URI
+ */
+record GoogleCloudStorageLocation(
+        String bucket,
+        String objectKey,
+        String encodedObjectKey
+) {
+
+    private static final String GS_SCHEME = "gs://";
+
+    private static final char FORWARD_SLASH = '/';
+
+    private static final String PLUS = "+";
+
+    private static final String SPACE_ENCODED = "%20";
+
+    private static final String URI_FORMAT = "gs://%s/%s";
+
+    /**
+     * Parse URI to Google Cloud Storage Location
+     *
+     * @param uri Google Cloud Storage URI
+     * @return Parsed Location
+     */
+    static GoogleCloudStorageLocation parse(final String uri) {
+        if (uri == null) {
+            throw new IllegalArgumentException("URI required");
+        }
+
+        if (uri.startsWith(GS_SCHEME)) {
+            final String path = uri.substring(GS_SCHEME.length());
+            final int forwardSlashIndex = path.indexOf(FORWARD_SLASH);
+            if (forwardSlashIndex < 1) {
+                throw new IllegalArgumentException("URI [%s] missing bucket 
and object objectKey".formatted(uri));
+            }
+
+            final String bucket = path.substring(0, forwardSlashIndex);
+            final String objectKey = path.substring(forwardSlashIndex + 1);
+            if (objectKey.isEmpty()) {
+                throw new IllegalArgumentException("URI [%s] empty object 
objectKey".formatted(uri));
+            }
+
+            final String encodedObjectKey = getEncodedObjectKey(objectKey);
+            return new GoogleCloudStorageLocation(bucket, objectKey, 
encodedObjectKey);
+        } else {
+            throw new IllegalArgumentException("URI [%s] missing required 
scheme [gs]".formatted(uri));
+        }
+    }
+
+    private static String getEncodedObjectKey(final String objectKey) {
+        final String urlEncodedObjectKey = URLEncoder.encode(objectKey, 
StandardCharsets.UTF_8);
+        return urlEncodedObjectKey.replace(PLUS, SPACE_ENCODED);
+    }
+
+    @Override
+    public String toString() {
+        return URI_FORMAT.formatted(bucket, objectKey);
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/GoogleCloudStorageOutputFile.java
 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/GoogleCloudStorageOutputFile.java
new file mode 100644
index 00000000000..0127999f47a
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/GoogleCloudStorageOutputFile.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg.gcs;
+
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.PositionOutputStream;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+
+/**
+ * Google Cloud Storage implementation of Apache Iceberg OutputFile using 
HttpClient for REST Operations
+ */
+class GoogleCloudStorageOutputFile implements OutputFile {
+
+    private final HttpClientProvider httpClientProvider;
+
+    private final GoogleCloudStorageProperties storageProperties;
+
+    private final GoogleCloudStorageLocation location;
+
+    private final String path;
+
+    GoogleCloudStorageOutputFile(
+            final HttpClientProvider httpClientProvider,
+            final GoogleCloudStorageProperties storageProperties,
+            final GoogleCloudStorageLocation location,
+            final String path
+    ) {
+        this.httpClientProvider = httpClientProvider;
+        this.storageProperties = storageProperties;
+        this.location = location;
+        this.path = path;
+    }
+
+    /**
+     * Create OutputStream after checking for existing InputFile to avoid 
potential conflicts
+     *
+     * @return OutputStream for writing
+     */
+    @Override
+    public PositionOutputStream create() {
+        if (toInputFile().exists()) {
+            throw new AlreadyExistsException("File already exists [%s]", path);
+        }
+        return createOrOverwrite();
+    }
+
+    /**
+     * Create OutputStream or overwrite existing file when necessary
+     *
+     * @return OutputStream for writing
+     */
+    @Override
+    public PositionOutputStream createOrOverwrite() {
+        try {
+            return new 
GoogleCloudStoragePositionOutputStream(httpClientProvider, storageProperties, 
location);
+        } catch (final IOException e) {
+            throw new UncheckedIOException("Failed to create stream 
[%s]".formatted(path), e);
+        }
+    }
+
+    /**
+     * Get Location of OutputFile
+     *
+     * @return Location
+     */
+    @Override
+    public String location() {
+        return path;
+    }
+
+    /**
+     * Get InputFile corresponding to location of current OutputFile
+     *
+     * @return InputFile for current location
+     */
+    @Override
+    public InputFile toInputFile() {
+        return new GoogleCloudStorageInputFile(httpClientProvider, 
storageProperties, location, path, null);
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/GoogleCloudStoragePositionOutputStream.java
 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/GoogleCloudStoragePositionOutputStream.java
new file mode 100644
index 00000000000..9854e996e29
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/GoogleCloudStoragePositionOutputStream.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg.gcs;
+
+import org.apache.iceberg.io.PositionOutputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.http.HttpHeaders;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.util.Optional;
+
+import static java.net.HttpURLConnection.HTTP_CREATED;
+import static java.net.HttpURLConnection.HTTP_OK;
+import static 
org.apache.nifi.services.iceberg.gcs.GoogleCloudStorageHeader.CONTENT_RANGE;
+import static 
org.apache.nifi.services.iceberg.gcs.GoogleCloudStorageHeader.ENCRYPTION_ALGORITHM;
+import static 
org.apache.nifi.services.iceberg.gcs.GoogleCloudStorageHeader.ENCRYPTION_KEY;
+import static 
org.apache.nifi.services.iceberg.gcs.GoogleCloudStorageHeader.ENCRYPTION_KEY_SHA256;
+import static 
org.apache.nifi.services.iceberg.gcs.GoogleCloudStorageHeader.LOCATION;
+import static 
org.apache.nifi.services.iceberg.gcs.GoogleCloudStorageHeader.UPLOAD_CONTENT_TYPE;
+
+/**
+ * Google Cloud Storage implementation of PositionOutputStream supporting 
resumable uploads
+ */
+class GoogleCloudStoragePositionOutputStream extends PositionOutputStream {
+
+    private static final String CONTENT_TYPE_OCTET_STREAM = 
"application/octet-stream";
+    private static final String CONTENT_RANGE_FORMAT = "bytes %d-%d/%s";
+    private static final String RANGE_TOTAL_UNKNOWN = "*";
+
+    private static final int HTTP_RESUME_INCOMPLETE = 308;
+
+    private static final Logger logger = 
LoggerFactory.getLogger(GoogleCloudStoragePositionOutputStream.class);
+
+    private final HttpClientProvider httpClientProvider;
+    private final GoogleCloudStorageProperties storageProperties;
+    private final String sessionUri;
+    private final byte[] buffer;
+    private final int chunkSize;
+    private int bufferPosition;
+    private long position;
+    private boolean closed;
+
+    /**
+     * Google Cloud Storage Position OutputStream constructor initiates a 
resumable upload on creation
+     *
+     * @param httpClientProvider HTTP Client Provider
+     * @param storageProperties Google Cloud Storage Properties
+     * @param location Google Cloud Storage Location
+     * @throws IOException Thrown on failure to initiate upload
+     */
+    GoogleCloudStoragePositionOutputStream(
+            final HttpClientProvider httpClientProvider,
+            final GoogleCloudStorageProperties storageProperties,
+            final GoogleCloudStorageLocation location
+    ) throws IOException {
+        this.httpClientProvider = httpClientProvider;
+        this.storageProperties = storageProperties;
+        this.chunkSize = storageProperties.writeChunkSize();
+        this.buffer = new byte[chunkSize];
+        this.sessionUri = initiateUpload(location);
+    }
+
+    /**
+     * Get stream position
+     *
+     * @return Current stream position
+     */
+    @Override
+    public long getPos() {
+        return position;
+    }
+
+    /**
+     * Write byte to stream and increment position
+     *
+     * @param data byte to be written
+     * @throws IOException Thrown on failure to send chunk
+     */
+    @Override
+    public void write(final int data) throws IOException {
+        ensureOpen();
+        if (bufferPosition >= chunkSize) {
+            sendChunk(false);
+        }
+        buffer[bufferPosition++] = (byte) data;
+        position++;
+    }
+
+    /**
+     * Write bytes to stream and increment position
+     *
+     * @param data bytes to be written
+     * @param offset initial offset in bytes to be written
+     * @param length number of bytes to be written
+     * @throws IOException Thrown on failure to send chunk
+     */
+    @Override
+    public void write(final byte[] data, final int offset, final int length) 
throws IOException {
+        ensureOpen();
+
+        int remaining = length;
+        int currentOffset = offset;
+        while (remaining > 0) {
+            if (bufferPosition >= chunkSize) {
+                sendChunk(false);
+            }
+            final int space = chunkSize - bufferPosition;
+            final int copyLength = Math.min(remaining, space);
+
+            System.arraycopy(data, currentOffset, buffer, bufferPosition, 
copyLength);
+            bufferPosition += copyLength;
+            currentOffset += copyLength;
+            remaining -= copyLength;
+            position += copyLength;
+        }
+    }
+
+    /**
+     * Close stream and send final chunk
+     *
+     * @throws IOException Thrown on failure to send chunk
+     */
+    @Override
+    public void close() throws IOException {
+        if (!closed) {
+            closed = true;
+            sendChunk(true);
+        }
+    }
+
+    private void ensureOpen() throws IOException {
+        if (closed) {
+            throw new IOException("Stream closed");
+        }
+    }
+
+    private String initiateUpload(final GoogleCloudStorageLocation location) 
throws IOException {
+        final String uri = storageProperties.uploadUri(location);
+        final HttpRequest.Builder builder = 
httpClientProvider.newRequestBuilder(uri)
+                .POST(HttpRequest.BodyPublishers.noBody())
+                .header(UPLOAD_CONTENT_TYPE.getHeader(), 
CONTENT_TYPE_OCTET_STREAM);
+
+        addEncryptionHeaders(builder);
+
+        final HttpResponse<String> response = 
httpClientProvider.send(builder.build(), HttpResponse.BodyHandlers.ofString());
+        final int statusCode = response.statusCode();
+        if (HTTP_OK == statusCode) {
+            final HttpHeaders headers = response.headers();
+            final Optional<String> locationHeaderFound = 
headers.firstValue(LOCATION.getHeader());
+            if (locationHeaderFound.isPresent()) {
+                logger.debug("Upload initiated [{}] HTTP {}", uri, statusCode);
+                return locationHeaderFound.get();
+            } else {
+                throw new IOException("Initiate upload failed [%s] Location 
response header not found".formatted(location));
+            }
+        } else {
+            final String responseBody = response.body();
+            throw new IOException("Initiate upload failed [%s] HTTP %d 
[%s]".formatted(location, statusCode, responseBody));
+        }
+    }
+
+    private void sendChunk(final boolean lastChunk) throws IOException {
+        if (bufferPosition == 0 && !lastChunk) {
+            return;
+        }
+
+        final HttpRequest.Builder builder = 
HttpRequest.newBuilder(URI.create(sessionUri))
+                .timeout(HttpClientProvider.REQUEST_TIMEOUT);
+
+        if (bufferPosition == 0) {
+            builder.PUT(HttpRequest.BodyPublishers.noBody());
+        } else {
+            final long rangeStart = position - bufferPosition;
+            final long rangeEnd = position - 1;
+            final String total = lastChunk ? String.valueOf(position) : 
RANGE_TOTAL_UNKNOWN;
+            final String contentRange = 
CONTENT_RANGE_FORMAT.formatted(rangeStart, rangeEnd, total);
+
+            builder.PUT(HttpRequest.BodyPublishers.ofByteArray(buffer, 0, 
bufferPosition))
+                    .header(CONTENT_RANGE.getHeader(), contentRange);
+        }
+
+        final HttpResponse<String> response = 
httpClientProvider.send(builder.build(), HttpResponse.BodyHandlers.ofString());
+        final int statusCode = response.statusCode();
+        logger.debug("Buffer uploaded [{}] HTTP {}", sessionUri, statusCode);
+
+        if (lastChunk) {
+            if (statusCode != HTTP_OK && statusCode != HTTP_CREATED) {
+                throw new IOException("Upload finalization failed HTTP %d 
[%s]".formatted(statusCode, response.body()));
+            }
+        } else {
+            if (statusCode != HTTP_RESUME_INCOMPLETE) {
+                throw new IOException("Chunk upload failed HTTP %d 
[%s]".formatted(statusCode, response.body()));
+            }
+        }
+
+        bufferPosition = 0;
+    }
+
+    private void addEncryptionHeaders(final HttpRequest.Builder builder) {
+        final String key = storageProperties.encryptionKey();
+        if (key != null && !key.isBlank()) {
+            builder.header(ENCRYPTION_ALGORITHM.getHeader(), 
storageProperties.encryptionAlgorithm());
+            builder.header(ENCRYPTION_KEY.getHeader(), key);
+            builder.header(ENCRYPTION_KEY_SHA256.getHeader(), 
KeyDigestProvider.getDigestEncoded(key));
+        }
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/GoogleCloudStorageProperties.java
 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/GoogleCloudStorageProperties.java
new file mode 100644
index 00000000000..701aca1bf87
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/GoogleCloudStorageProperties.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg.gcs;
+
+import java.util.Map;
+import java.util.Objects;
+
+import static 
org.apache.nifi.services.iceberg.gcs.GoogleCloudStorageProperty.DECRYPTION_KEY;
+import static 
org.apache.nifi.services.iceberg.gcs.GoogleCloudStorageProperty.ENCRYPTION_KEY;
+import static 
org.apache.nifi.services.iceberg.gcs.GoogleCloudStorageProperty.READ_CHUNK_SIZE_BYTES;
+import static 
org.apache.nifi.services.iceberg.gcs.GoogleCloudStorageProperty.SERVICE_HOST;
+import static 
org.apache.nifi.services.iceberg.gcs.GoogleCloudStorageProperty.USER_PROJECT;
+import static 
org.apache.nifi.services.iceberg.gcs.GoogleCloudStorageProperty.WRITE_CHUNK_SIZE_BYTES;
+
+/**
+ * Google Cloud Storage Properties encapsulating value parsing and URI 
resolution
+ */
+class GoogleCloudStorageProperties {
+
+    static final String DEFAULT_SERVICE_HOST = 
"https://storage.googleapis.com";;
+    static final int DEFAULT_WRITE_CHUNK_SIZE = 8 * 1024 * 1024;
+    static final int DEFAULT_READ_CHUNK_SIZE = 2 * 1024 * 1024;
+
+    private static final String METADATA_URI_FORMAT = 
"%s/storage/v1/b/%s/o/%s";
+    private static final String UPLOAD_URI_FORMAT = 
"%s/upload/storage/v1/b/%s/o?uploadType=resumable&name=%s";
+    private static final String DOWNLOAD_URI_FORMAT = 
"%s/storage/v1/b/%s/o/%s?alt=media";
+    private static final String USER_PROJECT_FIRST_FORMAT = 
"%s?userProject=%s";
+    private static final String USER_PROJECT_ADDITIONAL_FORMAT = 
"%s&userProject=%s";
+
+    private static final String ENCRYPTION_ALGORITHM = "AES256";
+
+    private final String serviceHost;
+    private final String userProject;
+    private final String encryptionKey;
+    private final String decryptionKey;
+    private final int writeChunkSize;
+    private final int readChunkSize;
+
+    GoogleCloudStorageProperties(final Map<String, String> properties) {
+        Objects.requireNonNull(properties, "Properties required");
+
+        final String serviceHost = properties.get(SERVICE_HOST.getProperty());
+        if (serviceHost == null || serviceHost.isBlank()) {
+            this.serviceHost = DEFAULT_SERVICE_HOST;
+        } else {
+            this.serviceHost = serviceHost;
+        }
+
+        this.userProject = properties.get(USER_PROJECT.getProperty());
+        this.encryptionKey = properties.get(ENCRYPTION_KEY.getProperty());
+        this.decryptionKey = properties.get(DECRYPTION_KEY.getProperty());
+
+        final String writeChunkSizeBytes = 
properties.get(WRITE_CHUNK_SIZE_BYTES.getProperty());
+        if (writeChunkSizeBytes == null) {
+            this.writeChunkSize = DEFAULT_WRITE_CHUNK_SIZE;
+        } else {
+            this.writeChunkSize = Integer.parseInt(writeChunkSizeBytes);
+        }
+
+        final String readChunkSizeBytes = 
properties.get(READ_CHUNK_SIZE_BYTES.getProperty());
+        if (readChunkSizeBytes == null) {
+            this.readChunkSize = DEFAULT_READ_CHUNK_SIZE;
+        } else {
+            this.readChunkSize = Integer.parseInt(readChunkSizeBytes);
+        }
+    }
+
+    String encryptionAlgorithm() {
+        return ENCRYPTION_ALGORITHM;
+    }
+
+    String encryptionKey() {
+        return encryptionKey;
+    }
+
+    String decryptionKey() {
+        return decryptionKey;
+    }
+
+    int writeChunkSize() {
+        return writeChunkSize;
+    }
+
+    int readChunkSize() {
+        return readChunkSize;
+    }
+
+    String metadataUri(final GoogleCloudStorageLocation location) {
+        final String uri = METADATA_URI_FORMAT.formatted(serviceHost, 
location.bucket(), location.encodedObjectKey());
+
+        final String metadataUri;
+        if (userProject == null) {
+            metadataUri = uri;
+        } else {
+            metadataUri = USER_PROJECT_FIRST_FORMAT.formatted(uri, 
userProject);
+        }
+
+        return metadataUri;
+    }
+
+    String uploadUri(final GoogleCloudStorageLocation location) {
+        final String uri = UPLOAD_URI_FORMAT.formatted(serviceHost, 
location.bucket(), location.encodedObjectKey());
+
+        final String uploadUri;
+        if (userProject == null) {
+            uploadUri = uri;
+        } else {
+            uploadUri = USER_PROJECT_ADDITIONAL_FORMAT.formatted(uri, 
userProject);
+        }
+
+        return uploadUri;
+    }
+
+    String downloadUri(final GoogleCloudStorageLocation location) {
+        final String uri = DOWNLOAD_URI_FORMAT.formatted(serviceHost, 
location.bucket(), location.encodedObjectKey());
+
+        final String downloadUri;
+        if (userProject == null) {
+            downloadUri = uri;
+        } else {
+            downloadUri = USER_PROJECT_ADDITIONAL_FORMAT.formatted(uri, 
userProject);
+        }
+
+        return downloadUri;
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/GoogleCloudStorageProperty.java
 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/GoogleCloudStorageProperty.java
new file mode 100644
index 00000000000..11e785e5df4
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/GoogleCloudStorageProperty.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg.gcs;
+
+/**
+ * Google Cloud Storage configuration properties aligned with Apache Iceberg 
GCPProperties
+ */
+enum GoogleCloudStorageProperty {
+
+    OAUTH2_TOKEN("gcs.oauth2.token"),
+
+    SERVICE_HOST("gcs.service.host"),
+
+    WRITE_CHUNK_SIZE_BYTES("gcs.channel.write.chunk-size-bytes"),
+
+    READ_CHUNK_SIZE_BYTES("gcs.channel.read.chunk-size-bytes"),
+
+    USER_PROJECT("gcs.user-project"),
+
+    ENCRYPTION_KEY("gcs.encryption-key"),
+
+    DECRYPTION_KEY("gcs.decryption-key");
+
+    private final String property;
+
+    GoogleCloudStorageProperty(final String property) {
+        this.property = property;
+    }
+
+    String getProperty() {
+        return property;
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/GoogleCloudStorageSeekableInputStream.java
 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/GoogleCloudStorageSeekableInputStream.java
new file mode 100644
index 00000000000..1cb718dede5
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/GoogleCloudStorageSeekableInputStream.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg.gcs;
+
+import org.apache.iceberg.io.SeekableInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static java.net.HttpURLConnection.HTTP_OK;
+import static java.net.HttpURLConnection.HTTP_PARTIAL;
+import static 
org.apache.nifi.services.iceberg.gcs.GoogleCloudStorageHeader.CONTENT_RANGE;
+import static 
org.apache.nifi.services.iceberg.gcs.GoogleCloudStorageHeader.ENCRYPTION_ALGORITHM;
+import static 
org.apache.nifi.services.iceberg.gcs.GoogleCloudStorageHeader.ENCRYPTION_KEY;
+import static 
org.apache.nifi.services.iceberg.gcs.GoogleCloudStorageHeader.ENCRYPTION_KEY_SHA256;
+import static 
org.apache.nifi.services.iceberg.gcs.GoogleCloudStorageHeader.RANGE;
+
+/**
+ * Google Cloud Storage implementation of SeekableInputStream supporting 
chunked buffered reads
+ */
+class GoogleCloudStorageSeekableInputStream extends SeekableInputStream {
+
+    private static final Pattern CONTENT_RANGE_TOTAL = 
Pattern.compile("/(\\d+)$");
+    private static final int FIRST_GROUP = 1;
+    private static final int END_OF_STREAM = -1;
+
+    private static final String RANGE_FORMAT = "bytes=%d-%d";
+
+    private static final int HTTP_RANGE_NOT_SATISFIABLE = 416;
+
+    private static final Logger logger = 
LoggerFactory.getLogger(GoogleCloudStorageSeekableInputStream.class);
+
+    private final HttpClientProvider httpClientProvider;
+    private final GoogleCloudStorageLocation location;
+    private final int readChunkSize;
+    private final String downloadUri;
+    private final String decryptionKey;
+    private final String encryptionAlgorithm;
+
+    private long pos;
+    private long contentLength;
+    private boolean contentLengthProvided;
+    private byte[] buffer;
+    private long bufferStart;
+    private int bufferLength;
+    private boolean closed;
+
+    /**
+     * Google Cloud Storage SeekableInputStream constructor with optional 
Content Length
+     *
+     * @param httpClientProvider HTTP Client Provider
+     * @param storageProperties Google Cloud Storage Properties
+     * @param location Google Cloud Storage Location
+     * @param contentLength Content Length or null when not known
+     */
+    GoogleCloudStorageSeekableInputStream(
+            final HttpClientProvider httpClientProvider,
+            final GoogleCloudStorageProperties storageProperties,
+            final GoogleCloudStorageLocation location,
+            final Long contentLength
+    ) {
+        this.httpClientProvider = httpClientProvider;
+        this.location = location;
+        this.readChunkSize = storageProperties.readChunkSize();
+        this.downloadUri = storageProperties.downloadUri(location);
+        this.decryptionKey = storageProperties.decryptionKey();
+        this.encryptionAlgorithm = storageProperties.encryptionAlgorithm();
+        if (contentLength != null) {
+            this.contentLength = contentLength;
+            this.contentLengthProvided = true;
+        }
+    }
+
+    /**
+     * Get current stream position
+     *
+     * @return Current position
+     */
+    @Override
+    public long getPos() {
+        return pos;
+    }
+
+    /**
+     * Seek to new position
+     *
+     * @param newPos New position
+     */
+    @Override
+    public void seek(final long newPos) {
+        if (newPos < 0) {
+            throw new IllegalArgumentException("Seek position must not be 
negative [%d]".formatted(newPos));
+        }
+        this.pos = newPos;
+    }
+
+    /**
+     * Read data and buffer as needed
+     *
+     * @return Data byte read
+     * @throws IOException Thrown on failure to buffer
+     */
+    @Override
+    public int read() throws IOException {
+        ensureOpen();
+        if (contentLengthProvided && pos >= contentLength) {
+            return END_OF_STREAM;
+        }
+
+        ensureBuffer();
+        if (bufferLength == 0) {
+            return END_OF_STREAM;
+        }
+
+        final int offset = (int) (pos - bufferStart);
+        if (offset >= bufferLength) {
+            return END_OF_STREAM;
+        }
+
+        pos++;
+        return buffer[offset] & 0xFF;
+    }
+
+    /**
+     * Read data into buffer with start offset and length requested
+     *
+     * @param data Data buffer for bytes read
+     * @param offset Start offset for data buffer
+     * @param length Maximum number of bytes to read
+     * @return Number of bytes read
+     * @throws IOException Thrown on failure to buffer
+     */
+    @Override
+    public int read(final byte[] data, final int offset, final int length) 
throws IOException {
+        ensureOpen();
+        if (length == 0) {
+            return 0;
+        }
+        if (contentLengthProvided && pos >= contentLength) {
+            return END_OF_STREAM;
+        }
+
+        int totalRead = 0;
+        int remaining = length;
+        int dataOffset = offset;
+
+        while (remaining > 0) {
+            ensureBuffer();
+            if (bufferLength == 0) {
+                break;
+            }
+            final int bufOffset = (int) (pos - bufferStart);
+            if (bufOffset >= bufferLength) {
+                break;
+            }
+            final int available = bufferLength - bufOffset;
+            final int copyLength = Math.min(remaining, available);
+            System.arraycopy(buffer, bufOffset, data, dataOffset, copyLength);
+            pos += copyLength;
+            totalRead += copyLength;
+            dataOffset += copyLength;
+            remaining -= copyLength;
+        }
+
+        return totalRead == 0 ? END_OF_STREAM : totalRead;
+    }
+
+    /**
+     * Get number of bytes available in buffer for reading
+     *
+     * @return Number of bytes available for reading
+     */
+    @Override
+    public int available() {
+        if (buffer == null) {
+            return 0;
+        }
+        final int bufferOffset = (int) (pos - bufferStart);
+        final int bufferRemaining = bufferLength - bufferOffset;
+        return Math.max(0, bufferRemaining);
+    }
+
+    /**
+     * Close stream and clear buffer
+     */
+    @Override
+    public void close() {
+        closed = true;
+        buffer = null;
+    }
+
+    private void ensureOpen() throws IOException {
+        if (closed) {
+            throw new IOException("Stream closed");
+        }
+    }
+
+    private void ensureBuffer() throws IOException {
+        if (buffer != null) {
+            final int offset = (int) (pos - bufferStart);
+            if (offset >= 0 && offset < bufferLength) {
+                return;
+            }
+        }
+        fillBuffer();
+    }
+
+    private void fillBuffer() throws IOException {
+        final long rangeStart = pos;
+        final long rangeEnd = rangeStart + readChunkSize - 1;
+        final String range = RANGE_FORMAT.formatted(rangeStart, rangeEnd);
+
+        final HttpRequest.Builder builder = 
httpClientProvider.newRequestBuilder(downloadUri)
+                .GET()
+                .header(RANGE.getHeader(), range);
+
+        addDecryptionHeaders(builder);
+
+        final HttpResponse<byte[]> response = 
httpClientProvider.send(builder.build(), 
HttpResponse.BodyHandlers.ofByteArray());
+        final int statusCode = response.statusCode();
+        logger.debug("Read object [{}] Range [{}] HTTP {}", downloadUri, 
range, statusCode);
+
+        // Set empty buffer when Google Cloud Storage cannot provide range 
requested
+        if (HTTP_RANGE_NOT_SATISFIABLE == statusCode) {
+            bufferStart = pos;
+            bufferLength = 0;
+            if (buffer == null) {
+                buffer = new byte[0];
+            }
+            return;
+        }
+
+        if (statusCode != HTTP_OK && statusCode != HTTP_PARTIAL) {
+            throw new IOException("Read object failed [%s] Range [%s] HTTP 
%d".formatted(location, range, statusCode));
+        }
+
+        // Set Content-Length when not provided
+        if (!contentLengthProvided) {
+            parseContentLength(response);
+        }
+
+        buffer = response.body();
+        bufferStart = rangeStart;
+        bufferLength = buffer.length;
+    }
+
+    private void parseContentLength(final HttpResponse<?> response) {
+        
response.headers().firstValue(CONTENT_RANGE.getHeader()).ifPresent(header -> {
+            final Matcher matcher = CONTENT_RANGE_TOTAL.matcher(header);
+            if (matcher.find()) {
+                final String contentRangeTotal = matcher.group(FIRST_GROUP);
+                contentLength = Long.parseLong(contentRangeTotal);
+                contentLengthProvided = true;
+            }
+        });
+    }
+
+    private void addDecryptionHeaders(final HttpRequest.Builder builder) {
+        if (decryptionKey != null && !decryptionKey.isBlank()) {
+            builder.header(ENCRYPTION_ALGORITHM.getHeader(), 
encryptionAlgorithm);
+            builder.header(ENCRYPTION_KEY.getHeader(), decryptionKey);
+            builder.header(ENCRYPTION_KEY_SHA256.getHeader(), 
KeyDigestProvider.getDigestEncoded(decryptionKey));
+        }
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/HttpClientProvider.java
 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/HttpClientProvider.java
new file mode 100644
index 00000000000..3c2916ed01d
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/HttpClientProvider.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg.gcs;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.time.Duration;
+import java.util.Set;
+
+import static java.net.HttpURLConnection.HTTP_BAD_GATEWAY;
+import static java.net.HttpURLConnection.HTTP_CLIENT_TIMEOUT;
+import static java.net.HttpURLConnection.HTTP_GATEWAY_TIMEOUT;
+import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR;
+import static java.net.HttpURLConnection.HTTP_UNAVAILABLE;
+import static 
org.apache.nifi.services.iceberg.gcs.GoogleCloudStorageHeader.AUTHORIZATION;
+
+/**
+ * HTTP Client Provider encapsulates request and response methods with a 
configured HTTP Client and standard settings
+ */
+class HttpClientProvider {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(HttpClientProvider.class);
+
+    private static final String BEARER_FORMAT = "Bearer %s";
+
+    /** Connect Timeout based on default setting from Google Cloud Storage 
library */
+    private static final Duration CONNECT_TIMEOUT = Duration.ofSeconds(20);
+
+    /** Request Timeout provides failsafe behavior */
+    static final Duration REQUEST_TIMEOUT = Duration.ofMinutes(5);
+
+    private static final Duration DEFAULT_INITIAL_BACKOFF = 
Duration.ofSeconds(1);
+
+    private static final int SLEEP_MULTIPLIER = 5;
+
+    private static final int MAX_ATTEMPTS = 3;
+
+    private static final int HTTP_TOO_MANY_REQUESTS = 429;
+
+    /** HTTP status codes considered retriable aligned with Google Cloud 
Storage library */
+    static final Set<Integer> RETRIABLE_STATUS_CODES = Set.of(
+            HTTP_CLIENT_TIMEOUT,
+            HTTP_TOO_MANY_REQUESTS,
+            HTTP_INTERNAL_ERROR,
+            HTTP_BAD_GATEWAY,
+            HTTP_UNAVAILABLE,
+            HTTP_GATEWAY_TIMEOUT
+    );
+
+    private final String bearerToken;
+    private final HttpClient httpClient;
+    private final Duration initialBackoff;
+
+    HttpClientProvider(final String bearerToken) {
+        this(bearerToken, DEFAULT_INITIAL_BACKOFF);
+    }
+
+    HttpClientProvider(final String bearerToken, final Duration 
initialBackoff) {
+        this.bearerToken = bearerToken;
+        this.initialBackoff = initialBackoff;
+        this.httpClient = HttpClient.newBuilder()
+                .connectTimeout(CONNECT_TIMEOUT)
+                .build();
+    }
+
+    /**
+     * Send HTTP request with retry on status codes considered retriable 
according to Google Cloud Storage API
+     *
+     * @param request HTTP Request to be executed
+     * @param bodyHandler HTTP Response Body Handler
+     * @return HTTP Response
+     * @param <T> HTTP Response Body Type
+     * @throws IOException Thrown on failure after retries exhausted
+     */
+    <T> HttpResponse<T> send(final HttpRequest request, final 
HttpResponse.BodyHandler<T> bodyHandler) throws IOException {
+        HttpResponse<T> response = null;
+        IOException lastException = null;
+
+        final String requestMethod = request.method();
+        final URI requestUri = request.uri();
+        for (int attempt = 1; attempt <= MAX_ATTEMPTS; attempt++) {
+            try {
+                response = httpClient.send(request, bodyHandler);
+
+                final int statusCode = response.statusCode();
+                if (RETRIABLE_STATUS_CODES.contains(statusCode) && attempt < 
MAX_ATTEMPTS) {
+                    logger.info("{} [{}] HTTP {} for attempt {} of {}", 
requestMethod, requestUri, statusCode, attempt, MAX_ATTEMPTS);
+                    sleep(attempt);
+                    continue;
+                }
+
+                return response;
+            } catch (final InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new IOException("%s [%s] request 
interrupted".formatted(requestMethod, requestUri), e);
+            } catch (final IOException e) {
+                lastException = e;
+                if (attempt < MAX_ATTEMPTS) {
+                    logger.info("{} [{}] request failed for attempt {} of {}", 
requestMethod, requestUri, attempt, MAX_ATTEMPTS, e);
+                    sleep(attempt);
+                }
+            }
+        }
+
+        if (response == null) {
+            throw lastException;
+        }
+
+        return response;
+    }
+
+    HttpRequest.Builder newRequestBuilder(final String uri) {
+        final HttpRequest.Builder builder = 
HttpRequest.newBuilder(URI.create(uri))
+                .timeout(REQUEST_TIMEOUT);
+        if (bearerToken != null) {
+            builder.header(AUTHORIZATION.getHeader(), 
BEARER_FORMAT.formatted(bearerToken));
+        }
+        return builder;
+    }
+
+    void close() {
+        httpClient.close();
+    }
+
+    private void sleep(final int attempt) throws IOException {
+        final long duration = initialBackoff.toMillis() * attempt * 
SLEEP_MULTIPLIER;
+        try {
+            Thread.sleep(duration);
+        } catch (final InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new IOException("Retry interrupted", e);
+        }
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/HttpRequestException.java
 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/HttpRequestException.java
new file mode 100644
index 00000000000..61492999310
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/HttpRequestException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg.gcs;
+
+/**
+ * HTTP Request Exception indicating failures related to socket communication 
or request preparation
+ */
+class HttpRequestException extends RuntimeException {
+
+    HttpRequestException(final String message, final Throwable cause) {
+        super(message, cause);
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/HttpResponseException.java
 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/HttpResponseException.java
new file mode 100644
index 00000000000..77a0417595d
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/HttpResponseException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg.gcs;
+
+/**
+ * HTTP Response Exception indicating failures related to status codes or 
headers
+ */
+class HttpResponseException extends RuntimeException {
+
+    HttpResponseException(final String message) {
+        super(message);
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/KeyDigestProvider.java
 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/KeyDigestProvider.java
new file mode 100644
index 00000000000..1ed63acc3e2
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/KeyDigestProvider.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg.gcs;
+
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Base64;
+
+/**
+ * Computes a Base64-encoded SHA-256 digest of a Base64-encoded encryption 
objectKey,
+ * as required by the GCS JSON API customer-supplied encryption objectKey 
headers.
+ */
+class KeyDigestProvider {
+
+    private static final String DIGEST_ALGORITHM = "SHA-256";
+
+    private KeyDigestProvider() {
+    }
+
+    static String getDigestEncoded(final String keyEncoded) {
+        try {
+            final byte[] keyDecoded = Base64.getDecoder().decode(keyEncoded);
+            final byte[] hash = 
MessageDigest.getInstance(DIGEST_ALGORITHM).digest(keyDecoded);
+            return Base64.getEncoder().encodeToString(hash);
+        } catch (final NoSuchAlgorithmException e) {
+            throw new IllegalStateException("SHA-256 not found", e);
+        }
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
new file mode 100644
index 00000000000..e29160e46bd
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.services.iceberg.gcs.GCSIcebergFileIOProvider
diff --git 
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/test/java/org/apache/nifi/services/iceberg/gcs/GCSIcebergFileIOProviderTest.java
 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/test/java/org/apache/nifi/services/iceberg/gcs/GCSIcebergFileIOProviderTest.java
new file mode 100644
index 00000000000..8eb68f74dcb
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/test/java/org/apache/nifi/services/iceberg/gcs/GCSIcebergFileIOProviderTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg.gcs;
+
+import org.apache.iceberg.io.FileIO;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.services.iceberg.ProviderContext;
+import org.apache.nifi.util.NoOpProcessor;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+
+import static 
org.apache.nifi.services.iceberg.gcs.GoogleCloudStorageProperty.OAUTH2_TOKEN;
+import static 
org.apache.nifi.services.iceberg.gcs.GoogleCloudStorageProperty.SERVICE_HOST;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+class GCSIcebergFileIOProviderTest {
+    private static final String SERVICE_ID = 
GCSIcebergFileIOProvider.class.getSimpleName();
+    private static final String ACCESS_TOKEN = "access-token";
+    private static final String LOCALHOST_URL = "http://localhost:9000";;
+
+    private TestRunner runner;
+
+    private GCSIcebergFileIOProvider provider;
+
+    @BeforeEach
+    void setProvider() throws InitializationException {
+        provider = new GCSIcebergFileIOProvider();
+        runner = TestRunners.newTestRunner(NoOpProcessor.class);
+        runner.addControllerService(SERVICE_ID, provider);
+    }
+
+    @AfterEach
+    void disableProvider() {
+        runner.disableControllerService(provider);
+    }
+
+    @Test
+    void testGetFileIO() {
+        runner.enableControllerService(provider);
+
+        final Map<String, String> properties = Map.of();
+        final ProviderContext providerContext = () -> properties;
+
+        try (FileIO fileIO = provider.getFileIO(providerContext)) {
+            assertNotNull(fileIO);
+            assertInstanceOf(GoogleCloudStorageFileIO.class, fileIO);
+        }
+    }
+
+    @Test
+    void testGetFileIOWithVendedToken() {
+        runner.enableControllerService(provider);
+
+        final Map<String, String> properties = Map.of(
+                OAUTH2_TOKEN.getProperty(), ACCESS_TOKEN,
+                SERVICE_HOST.getProperty(), LOCALHOST_URL
+        );
+        final ProviderContext providerContext = () -> properties;
+
+        try (FileIO fileIO = provider.getFileIO(providerContext)) {
+            assertNotNull(fileIO);
+            assertInstanceOf(GoogleCloudStorageFileIO.class, fileIO);
+            final Map<String, String> configuredProperties = 
fileIO.properties();
+            assertEquals(ACCESS_TOKEN, 
configuredProperties.get(OAUTH2_TOKEN.getProperty()));
+            assertEquals(LOCALHOST_URL, 
configuredProperties.get(SERVICE_HOST.getProperty()));
+        }
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/test/java/org/apache/nifi/services/iceberg/gcs/GoogleCloudStorageFileIOTest.java
 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/test/java/org/apache/nifi/services/iceberg/gcs/GoogleCloudStorageFileIOTest.java
new file mode 100644
index 00000000000..376656e92b9
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/test/java/org/apache/nifi/services/iceberg/gcs/GoogleCloudStorageFileIOTest.java
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg.gcs;
+
+import mockwebserver3.MockResponse;
+import mockwebserver3.MockWebServer;
+import mockwebserver3.RecordedRequest;
+import okio.ByteString;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.PositionOutputStream;
+import org.apache.iceberg.io.SeekableInputStream;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+
+import static java.net.HttpURLConnection.HTTP_FORBIDDEN;
+import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
+import static java.net.HttpURLConnection.HTTP_NO_CONTENT;
+import static java.net.HttpURLConnection.HTTP_OK;
+import static java.net.HttpURLConnection.HTTP_PARTIAL;
+import static 
org.apache.nifi.services.iceberg.gcs.GoogleCloudStorageHeader.AUTHORIZATION;
+import static 
org.apache.nifi.services.iceberg.gcs.GoogleCloudStorageHeader.CONTENT_RANGE;
+import static 
org.apache.nifi.services.iceberg.gcs.GoogleCloudStorageHeader.LOCATION;
+import static 
org.apache.nifi.services.iceberg.gcs.GoogleCloudStorageProperty.OAUTH2_TOKEN;
+import static 
org.apache.nifi.services.iceberg.gcs.GoogleCloudStorageProperty.READ_CHUNK_SIZE_BYTES;
+import static 
org.apache.nifi.services.iceberg.gcs.GoogleCloudStorageProperty.SERVICE_HOST;
+import static 
org.apache.nifi.services.iceberg.gcs.GoogleCloudStorageProperty.USER_PROJECT;
+import static 
org.apache.nifi.services.iceberg.gcs.GoogleCloudStorageProperty.WRITE_CHUNK_SIZE_BYTES;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class GoogleCloudStorageFileIOTest {
+
+    private static final String TEST_TOKEN = "test-token";
+    private static final String TOKEN = "token";
+    private static final String BEARER_TEST_TOKEN = "Bearer test-token";
+
+    private static final String DELETE_METHOD = "DELETE";
+    private static final String POST_METHOD = "POST";
+    private static final String PUT_METHOD = "PUT";
+
+    private static final String UPLOAD_SESSION_PATH = "/upload-session";
+
+    private static final String BUCKET_PATH_FILE_URI = 
"gs://bucket/path/to/file.parquet";
+    private static final String BUCKET_MISSING_FILE_URI = 
"gs://bucket/missing-file.parquet";
+    private static final String BUCKET_EXISTS_URI = 
"gs://bucket/exists.parquet";
+    private static final String BUCKET_MISSING_URI = 
"gs://bucket/missing.parquet";
+    private static final String BUCKET_FILE_PARQUET_URI = 
"gs://bucket/file.parquet";
+    private static final String BUCKET_OUTPUT_URI = 
"gs://bucket/output.parquet";
+    private static final String BUCKET_FILE_TXT_URI = "gs://bucket/file.txt";
+    private static final String BUCKET_KEY_URI = "gs://bucket/objectKey";
+
+    private static final String CONTENT_RANGE_FORMAT = "bytes %d-%d/%d";
+    private static final String SIZE_RESPONSE_FORMAT = """
+            {"size": "%d"}""";
+    private static final String SIZE_RESPONSE_12345 = """
+            {"size": "12345"}""";
+    private static final String USER_PROJECT_QUERY = "userProject=my-project";
+    private static final String MY_PROJECT = "my-project";
+
+    private static final String HELLO_WORLD = "hello world";
+    private static final String READ_CONTENT = "test file content for reading";
+    private static final String ALPHABET = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
+    private static final String READ_CHUNK_SIZE_10 = "10";
+    private static final String LOCALHOST_URL_FORMAT = "http://localhost:%d";;
+
+    private static final long EXPECTED_LENGTH = 12345L;
+    private static final long KNOWN_LENGTH = 42L;
+
+    private MockWebServer mockWebServer;
+    private GoogleCloudStorageFileIO fileIO;
+    private String baseUrl;
+
+    @BeforeEach
+    void setServer() throws IOException {
+        mockWebServer = new MockWebServer();
+        mockWebServer.start();
+        baseUrl = LOCALHOST_URL_FORMAT.formatted(mockWebServer.getPort());
+    }
+
+    @AfterEach
+    void closeServer() {
+        if (fileIO != null) {
+            fileIO.close();
+        }
+        mockWebServer.close();
+    }
+
+    @Test
+    void testInitializeProperties() {
+        fileIO = new GoogleCloudStorageFileIO();
+        final Map<String, String> props = Map.of(
+                OAUTH2_TOKEN.getProperty(), TEST_TOKEN,
+                SERVICE_HOST.getProperty(), baseUrl
+        );
+        fileIO.initialize(props);
+
+        assertEquals(props, fileIO.properties());
+    }
+
+    @Test
+    void testDeleteFile() throws InterruptedException {
+        mockWebServer.enqueue(new MockResponse.Builder()
+                .code(HTTP_NO_CONTENT)
+                .build());
+
+        fileIO = createFileIO(TEST_TOKEN);
+        fileIO.deleteFile(BUCKET_PATH_FILE_URI);
+
+        final RecordedRequest request = mockWebServer.takeRequest();
+        assertEquals(DELETE_METHOD, request.getMethod());
+        assertEquals(BEARER_TEST_TOKEN, 
request.getHeaders().get(AUTHORIZATION.getHeader()));
+    }
+
+    @Test
+    void testDeleteFileNotFoundIgnored() {
+        mockWebServer.enqueue(new MockResponse.Builder()
+                .code(HTTP_NOT_FOUND)
+                .build());
+
+        fileIO = createFileIO(null);
+        fileIO.deleteFile(BUCKET_MISSING_FILE_URI);
+    }
+
+    @Test
+    void testDeleteFileFailedForbidden() {
+        mockWebServer.enqueue(new MockResponse.Builder()
+                .code(HTTP_FORBIDDEN)
+                .build());
+
+        fileIO = createFileIO(null);
+        assertThrows(HttpResponseException.class, () -> 
fileIO.deleteFile(BUCKET_PATH_FILE_URI));
+    }
+
+    @Test
+    void testDeleteFileRequestFailed() {
+        mockWebServer.close();
+
+        fileIO = createFileIO(null);
+        assertThrows(HttpRequestException.class, () -> 
fileIO.deleteFile(BUCKET_PATH_FILE_URI));
+    }
+
+    @Test
+    void testInputFileExists() {
+        mockWebServer.enqueue(new MockResponse.Builder()
+                .code(HTTP_OK)
+                .build());
+
+        fileIO = createFileIO(TOKEN);
+        assertTrue(fileIO.newInputFile(BUCKET_EXISTS_URI).exists());
+    }
+
+    @Test
+    void testInputFileNotExists() {
+        mockWebServer.enqueue(new MockResponse.Builder()
+                .code(HTTP_NOT_FOUND)
+                .build());
+
+        fileIO = createFileIO(null);
+        assertFalse(fileIO.newInputFile(BUCKET_MISSING_URI).exists());
+    }
+
+    @Test
+    void testInputFileGetLength() {
+        mockWebServer.enqueue(new MockResponse.Builder()
+                .code(HTTP_OK)
+                .body(SIZE_RESPONSE_12345)
+                .build());
+
+        fileIO = createFileIO(TOKEN);
+        assertEquals(EXPECTED_LENGTH, 
fileIO.newInputFile(BUCKET_FILE_PARQUET_URI).getLength());
+    }
+
+    @Test
+    void testInputFileKnownLength() {
+        fileIO = createFileIO(null);
+
+        final InputFile input = fileIO.newInputFile(BUCKET_FILE_PARQUET_URI, 
KNOWN_LENGTH);
+        assertEquals(KNOWN_LENGTH, input.getLength());
+    }
+
+    @Test
+    void testOutputFileCreateAndWrite() throws IOException, 
InterruptedException {
+        final String sessionUrl = baseUrl + UPLOAD_SESSION_PATH;
+
+        mockWebServer.enqueue(new MockResponse.Builder()
+                .code(HTTP_OK)
+                .addHeader(LOCATION.getHeader(), sessionUrl)
+                .build());
+
+        mockWebServer.enqueue(new MockResponse.Builder()
+                .code(HTTP_OK)
+                .build());
+
+        fileIO = createFileIO(TOKEN, Map.of(
+                WRITE_CHUNK_SIZE_BYTES.getProperty(), 
String.valueOf(GoogleCloudStorageProperties.DEFAULT_WRITE_CHUNK_SIZE)
+        ));
+
+        final OutputFile outputFile = fileIO.newOutputFile(BUCKET_OUTPUT_URI);
+        assertNotNull(outputFile.location());
+
+        final byte[] data = HELLO_WORLD.getBytes(StandardCharsets.UTF_8);
+        try (PositionOutputStream out = outputFile.createOrOverwrite()) {
+            out.write(data);
+            assertEquals(data.length, out.getPos());
+        }
+
+        final RecordedRequest initRequest = mockWebServer.takeRequest();
+        assertEquals(POST_METHOD, initRequest.getMethod());
+
+        final RecordedRequest uploadRequest = mockWebServer.takeRequest();
+        assertEquals(PUT_METHOD, uploadRequest.getMethod());
+
+        final ByteString requestBody = uploadRequest.getBody();
+        assertNotNull(requestBody);
+        assertArrayEquals(data, requestBody.toByteArray());
+    }
+
+    @Test
+    void testReadClose() throws IOException {
+        final byte[] content = READ_CONTENT.getBytes(StandardCharsets.UTF_8);
+
+        mockWebServer.enqueue(new MockResponse.Builder()
+                .code(HTTP_OK)
+                .body(SIZE_RESPONSE_FORMAT.formatted(content.length))
+                .build());
+
+        mockWebServer.enqueue(new MockResponse.Builder()
+                .code(HTTP_PARTIAL)
+                .addHeader(CONTENT_RANGE.getHeader(), 
CONTENT_RANGE_FORMAT.formatted(0, content.length - 1, content.length))
+                .body(READ_CONTENT)
+                .build());
+
+        fileIO = createFileIO(TOKEN);
+
+        final InputFile inputFile = fileIO.newInputFile(BUCKET_FILE_TXT_URI);
+        assertEquals(content.length, inputFile.getLength());
+
+        try (SeekableInputStream stream = inputFile.newStream()) {
+            final byte[] result = new byte[content.length];
+            int totalRead = 0;
+            while (totalRead < content.length) {
+                final int n = stream.read(result, totalRead, content.length - 
totalRead);
+                if (n < 0) {
+                    break;
+                }
+                totalRead += n;
+            }
+            assertEquals(content.length, totalRead);
+            assertArrayEquals(content, result);
+            assertEquals(content.length, stream.getPos());
+        }
+    }
+
+    @Test
+    void testSeekableInputStreamSeek() throws IOException {
+        final byte[] content = ALPHABET.getBytes(StandardCharsets.UTF_8);
+
+        mockWebServer.enqueue(new MockResponse.Builder()
+                .code(HTTP_PARTIAL)
+                .addHeader(CONTENT_RANGE.getHeader(), 
CONTENT_RANGE_FORMAT.formatted(10, 19, content.length))
+                .body(ALPHABET.substring(10, 20))
+                .build());
+
+        mockWebServer.enqueue(new MockResponse.Builder()
+                .code(HTTP_PARTIAL)
+                .addHeader(CONTENT_RANGE.getHeader(), 
CONTENT_RANGE_FORMAT.formatted(25, 25, content.length))
+                .body(ALPHABET.substring(25, 26))
+                .build());
+
+        fileIO = createFileIO(TOKEN, Map.of(
+                READ_CHUNK_SIZE_BYTES.getProperty(), READ_CHUNK_SIZE_10
+        ));
+
+        try (SeekableInputStream stream = 
fileIO.newInputFile(BUCKET_FILE_TXT_URI, content.length).newStream()) {
+            assertEquals(0, stream.getPos());
+
+            stream.seek(10);
+            assertEquals(10, stream.getPos());
+            assertEquals('K', stream.read());
+            assertEquals(11, stream.getPos());
+
+            stream.seek(25);
+            assertEquals('Z', stream.read());
+            assertEquals(-1, stream.read());
+        }
+    }
+
+    @Test
+    void testUserProjectQueryParam() {
+        final GoogleCloudStorageProperties storageProperties = new 
GoogleCloudStorageProperties(Map.of(
+                SERVICE_HOST.getProperty(), baseUrl,
+                USER_PROJECT.getProperty(), MY_PROJECT
+        ));
+
+        final GoogleCloudStorageLocation loc = 
GoogleCloudStorageLocation.parse(BUCKET_KEY_URI);
+        
assertTrue(storageProperties.metadataUri(loc).contains(USER_PROJECT_QUERY));
+        
assertTrue(storageProperties.uploadUri(loc).contains(USER_PROJECT_QUERY));
+        
assertTrue(storageProperties.downloadUri(loc).contains(USER_PROJECT_QUERY));
+    }
+
+    private GoogleCloudStorageFileIO createFileIO(final String token) {
+        return createFileIO(token, Map.of());
+    }
+
+    private GoogleCloudStorageFileIO createFileIO(final String token, final 
Map<String, String> extraProps) {
+        final GoogleCloudStorageFileIO io = new GoogleCloudStorageFileIO();
+        final Map<String, String> props = new HashMap<>();
+        props.put(SERVICE_HOST.getProperty(), baseUrl);
+        if (token != null) {
+            props.put(OAUTH2_TOKEN.getProperty(), token);
+        }
+        props.putAll(extraProps);
+        io.initialize(props);
+        return io;
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/test/java/org/apache/nifi/services/iceberg/gcs/GoogleCloudStorageLocationTest.java
 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/test/java/org/apache/nifi/services/iceberg/gcs/GoogleCloudStorageLocationTest.java
new file mode 100644
index 00000000000..2eb1cb63b10
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/test/java/org/apache/nifi/services/iceberg/gcs/GoogleCloudStorageLocationTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg.gcs;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+class GoogleCloudStorageLocationTest {
+
+    private static final String OBJECT_BUCKET = "object-bucket";
+    private static final String SIMPLE_KEY = "path/to/object.parquet";
+    private static final String OBJECT_URI = 
"gs://%s/%s".formatted(OBJECT_BUCKET, SIMPLE_KEY);
+    private static final String BUCKET = "bucket";
+    private static final String OBJECT_PARQUET = "object.parquet";
+    private static final String SINGLE_SEGMENT_URI = 
"gs://%s/%s".formatted(BUCKET, OBJECT_PARQUET);
+    private static final String DEEP_KEY = "db/table/data/00000-0-abc.parquet";
+    private static final String DEEP_PATH_URI = 
"gs://bucket/%s".formatted(DEEP_KEY);
+
+    private static final String MULTI_SEGMENT_URI = "gs://bucket/a/b/c";
+    private static final String SPACES_URI = "gs://bucket/path with 
spaces/file name.txt";
+    private static final String ENCODED_MULTI_SEGMENT = "a%2Fb%2Fc";
+    private static final String ENCODED_SPACES = 
"path%20with%20spaces%2Ffile%20name.txt";
+
+    private static final String SIMPLE_BUCKET_KEY_URI = 
"gs://bucket/objectKey";
+    private static final String INVALID_SCHEME_URI = "s3://bucket/objectKey";
+    private static final String NO_BUCKET_URI = "gs:///objectKey";
+    private static final String NO_KEY_URI = "gs://bucket/";
+    private static final String NO_SLASH_URI = "gs://bucket";
+
+    @Test
+    void testParseSimple() {
+        final GoogleCloudStorageLocation location = 
GoogleCloudStorageLocation.parse(OBJECT_URI);
+        assertEquals(OBJECT_BUCKET, location.bucket());
+        assertEquals(SIMPLE_KEY, location.objectKey());
+    }
+
+    @Test
+    void testParseSingleSegmentKey() {
+        final GoogleCloudStorageLocation location = 
GoogleCloudStorageLocation.parse(SINGLE_SEGMENT_URI);
+        assertEquals(BUCKET, location.bucket());
+        assertEquals(OBJECT_PARQUET, location.objectKey());
+    }
+
+    @Test
+    void testParseDeepPath() {
+        final GoogleCloudStorageLocation location = 
GoogleCloudStorageLocation.parse(DEEP_PATH_URI);
+        assertEquals(BUCKET, location.bucket());
+        assertEquals(DEEP_KEY, location.objectKey());
+    }
+
+    @Test
+    void testEncodedKeyEncodesSlashes() {
+        final GoogleCloudStorageLocation location = 
GoogleCloudStorageLocation.parse(MULTI_SEGMENT_URI);
+        assertEquals(ENCODED_MULTI_SEGMENT, location.encodedObjectKey());
+    }
+
+    @Test
+    void testEncodedKeyEncodesSpaces() {
+        final GoogleCloudStorageLocation location = 
GoogleCloudStorageLocation.parse(SPACES_URI);
+        assertEquals(ENCODED_SPACES, location.encodedObjectKey());
+    }
+
+    @Test
+    void testToString() {
+        final GoogleCloudStorageLocation location = 
GoogleCloudStorageLocation.parse(SIMPLE_BUCKET_KEY_URI);
+        assertEquals(SIMPLE_BUCKET_KEY_URI, location.toString());
+    }
+
+    @Test
+    void testParseNullThrows() {
+        assertThrows(IllegalArgumentException.class, () -> 
GoogleCloudStorageLocation.parse(null));
+    }
+
+    @Test
+    void testParseInvalidSchemeThrows() {
+        assertThrows(IllegalArgumentException.class, () -> 
GoogleCloudStorageLocation.parse(INVALID_SCHEME_URI));
+    }
+
+    @Test
+    void testParseNoBucketThrows() {
+        assertThrows(IllegalArgumentException.class, () -> 
GoogleCloudStorageLocation.parse(NO_BUCKET_URI));
+    }
+
+    @Test
+    void testParseNoKeyThrows() {
+        assertThrows(IllegalArgumentException.class, () -> 
GoogleCloudStorageLocation.parse(NO_KEY_URI));
+    }
+
+    @Test
+    void testParseNoSlashThrows() {
+        assertThrows(IllegalArgumentException.class, () -> 
GoogleCloudStorageLocation.parse(NO_SLASH_URI));
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/test/java/org/apache/nifi/services/iceberg/gcs/HttpClientProviderTest.java
 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/test/java/org/apache/nifi/services/iceberg/gcs/HttpClientProviderTest.java
new file mode 100644
index 00000000000..4479a308bdf
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/test/java/org/apache/nifi/services/iceberg/gcs/HttpClientProviderTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg.gcs;
+
+import mockwebserver3.MockResponse;
+import mockwebserver3.MockWebServer;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.io.IOException;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.time.Duration;
+
+import static java.net.HttpURLConnection.HTTP_BAD_GATEWAY;
+import static java.net.HttpURLConnection.HTTP_CLIENT_TIMEOUT;
+import static java.net.HttpURLConnection.HTTP_FORBIDDEN;
+import static java.net.HttpURLConnection.HTTP_GATEWAY_TIMEOUT;
+import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR;
+import static java.net.HttpURLConnection.HTTP_OK;
+import static java.net.HttpURLConnection.HTTP_UNAVAILABLE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+class HttpClientProviderTest {
+
+    private static final int HTTP_TOO_MANY_REQUESTS = 429;
+
+    private static final Duration TEST_BACKOFF = Duration.ofMillis(10);
+
+    private static final String LOCALHOST_URI_FORMAT = 
"http://localhost:%d/test";;
+
+    private static final String GET_METHOD = "GET";
+
+    private static final int EXPECTED_REQUESTS_WITH_RETRY = 2;
+
+    private static final int EXPECTED_REQUESTS_RETRIES_EXHAUSTED = 3;
+
+    private MockWebServer mockWebServer;
+
+    private HttpClientProvider httpClientProvider;
+
+    @BeforeEach
+    void startServer() throws IOException {
+        mockWebServer = new MockWebServer();
+        mockWebServer.start();
+        httpClientProvider = new HttpClientProvider(null, TEST_BACKOFF);
+    }
+
+    @AfterEach
+    void closeServer() {
+        httpClientProvider.close();
+        mockWebServer.close();
+    }
+
+    @Test
+    void testSendSuccess() throws IOException {
+        mockWebServer.enqueue(new MockResponse.Builder()
+                .code(HTTP_OK)
+                .build());
+
+        final HttpResponse<String> response = sendGetRequest();
+
+        assertEquals(HTTP_OK, response.statusCode());
+        assertEquals(1, mockWebServer.getRequestCount());
+    }
+
+    @ParameterizedTest
+    @ValueSource(ints = {HTTP_CLIENT_TIMEOUT, HTTP_TOO_MANY_REQUESTS, 
HTTP_INTERNAL_ERROR, HTTP_BAD_GATEWAY, HTTP_UNAVAILABLE, HTTP_GATEWAY_TIMEOUT})
+    void testSendRetriableStatusCodeRetried(final int statusCode) throws 
IOException {
+        mockWebServer.enqueue(new MockResponse.Builder()
+                .code(statusCode)
+                .build());
+        mockWebServer.enqueue(new MockResponse.Builder()
+                .code(HTTP_OK)
+                .build());
+
+        final HttpResponse<String> response = sendGetRequest();
+
+        assertEquals(HTTP_OK, response.statusCode());
+        assertEquals(EXPECTED_REQUESTS_WITH_RETRY, 
mockWebServer.getRequestCount());
+    }
+
+    @Test
+    void testSendForbiddenStatusCodeNotRetried() throws IOException {
+        mockWebServer.enqueue(new MockResponse.Builder()
+                .code(HTTP_FORBIDDEN)
+                .build());
+
+        final HttpResponse<String> response = sendGetRequest();
+
+        assertEquals(HTTP_FORBIDDEN, response.statusCode());
+        assertEquals(1, mockWebServer.getRequestCount());
+    }
+
+    @Test
+    void testSendRetriesExhaustedReturnsLastResponse() throws IOException {
+        for (int i = 0; i < EXPECTED_REQUESTS_RETRIES_EXHAUSTED; i++) {
+            mockWebServer.enqueue(new MockResponse.Builder()
+                    .code(HTTP_UNAVAILABLE)
+                    .build());
+        }
+
+        final HttpResponse<String> response = sendGetRequest();
+
+        assertEquals(HTTP_UNAVAILABLE, response.statusCode());
+        assertEquals(EXPECTED_REQUESTS_RETRIES_EXHAUSTED, 
mockWebServer.getRequestCount());
+    }
+
+    @Test
+    void testSendException() {
+        mockWebServer.close();
+
+        assertThrows(IOException.class, this::sendGetRequest);
+    }
+
+    private HttpResponse<String> sendGetRequest() throws IOException {
+        final String uri = 
LOCALHOST_URI_FORMAT.formatted(mockWebServer.getPort());
+        final HttpRequest request = httpClientProvider.newRequestBuilder(uri)
+                .method(GET_METHOD, HttpRequest.BodyPublishers.noBody())
+                .build();
+        return httpClientProvider.send(request, 
HttpResponse.BodyHandlers.ofString());
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/test/java/org/apache/nifi/services/iceberg/gcs/KeyDigestProviderTest.java
 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/test/java/org/apache/nifi/services/iceberg/gcs/KeyDigestProviderTest.java
new file mode 100644
index 00000000000..4779fae7efe
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/test/java/org/apache/nifi/services/iceberg/gcs/KeyDigestProviderTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg.gcs;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class KeyDigestProviderTest {
+    private static final String INPUT_KEY = "0102030405060708";
+
+    private static final String DIGEST_EXPECTED = 
"Kt9FTK2ulLNLyGIXrZ0ZGAUKQwOQNpffViNE76oTgp0=";
+
+    @Test
+    void testGetDigestEncoded() {
+        final String digestEncoded = 
KeyDigestProvider.getDigestEncoded(INPUT_KEY);
+
+        assertEquals(DIGEST_EXPECTED, digestEncoded);
+    }
+}
diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/pom.xml 
b/nifi-extension-bundles/nifi-iceberg-bundle/pom.xml
index 4c34589558b..518309c079c 100644
--- a/nifi-extension-bundles/nifi-iceberg-bundle/pom.xml
+++ b/nifi-extension-bundles/nifi-iceberg-bundle/pom.xml
@@ -33,6 +33,8 @@
         <module>nifi-iceberg-aws-nar</module>
         <module>nifi-iceberg-azure</module>
         <module>nifi-iceberg-azure-nar</module>
+        <module>nifi-iceberg-gcs</module>
+        <module>nifi-iceberg-gcs-nar</module>
         <module>nifi-iceberg-parquet-writer</module>
         <module>nifi-iceberg-parquet-writer-nar</module>
         <module>nifi-iceberg-processors</module>

Reply via email to