This is an automated email from the ASF dual-hosted git repository.

kevinjqliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 42f3d01e6c Move all analyticscore references behind a 
AnalyticsCoreUtil class (#16258)
42f3d01e6c is described below

commit 42f3d01e6ceaf84ae2810bf902d9cb7bcc4c4574
Author: Talat UYARER <[email protected]>
AuthorDate: Tue May 12 13:00:41 2026 -0700

    Move all analyticscore references behind a AnalyticsCoreUtil class (#16258)
---
 .../org/apache/iceberg/gcp/gcs/TestGcsFileIO.java  |   2 +-
 .../apache/iceberg/gcp/gcs/AnalyticsCoreUtil.java  | 216 +++++++++++++++++++++
 .../org/apache/iceberg/gcp/gcs/BaseGCSFile.java    |   9 +-
 .../org/apache/iceberg/gcp/gcs/GCSInputFile.java   |  44 +----
 .../org/apache/iceberg/gcp/gcs/GCSOutputFile.java  |   5 +-
 .../iceberg/gcp/gcs/GcsInputStreamWrapper.java     | 143 --------------
 .../apache/iceberg/gcp/gcs/PrefixedStorage.java    |  65 +++----
 .../iceberg/gcp/gcs/TestAnalyticsCoreUtil.java     |  83 ++++++++
 .../apache/iceberg/gcp/gcs/TestGcsInputFile.java   |   6 +-
 .../iceberg/gcp/gcs/TestGcsInputStreamWrapper.java | 150 --------------
 .../iceberg/gcp/gcs/TestPrefixedStorage.java       |  28 ++-
 11 files changed, 366 insertions(+), 385 deletions(-)

diff --git 
a/gcp/src/integration/java/org/apache/iceberg/gcp/gcs/TestGcsFileIO.java 
b/gcp/src/integration/java/org/apache/iceberg/gcp/gcs/TestGcsFileIO.java
index 626aacd17d..1e956854b3 100644
--- a/gcp/src/integration/java/org/apache/iceberg/gcp/gcs/TestGcsFileIO.java
+++ b/gcp/src/integration/java/org/apache/iceberg/gcp/gcs/TestGcsFileIO.java
@@ -163,7 +163,7 @@ public class TestGcsFileIO {
       IOUtil.readFully(is, actual, 0, expected.length);
     }
 
-    assertThat(inputStream).isInstanceOf(GcsInputStreamWrapper.class);
+    assertThat(inputStream).isNotInstanceOf(GCSInputStream.class);
     assertThat(actual).isEqualTo(expected);
   }
 
diff --git 
a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/AnalyticsCoreUtil.java 
b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/AnalyticsCoreUtil.java
new file mode 100644
index 0000000000..4bc9659d77
--- /dev/null
+++ b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/AnalyticsCoreUtil.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.gcp.gcs;
+
+import com.google.auth.Credentials;
+import com.google.cloud.gcs.analyticscore.client.GcsFileInfo;
+import com.google.cloud.gcs.analyticscore.client.GcsFileSystem;
+import com.google.cloud.gcs.analyticscore.client.GcsFileSystemImpl;
+import com.google.cloud.gcs.analyticscore.client.GcsFileSystemOptions;
+import com.google.cloud.gcs.analyticscore.client.GcsItemId;
+import com.google.cloud.gcs.analyticscore.client.GcsItemInfo;
+import com.google.cloud.gcs.analyticscore.client.GcsObjectRange;
+import com.google.cloud.gcs.analyticscore.core.GcsAnalyticsCoreOptions;
+import com.google.cloud.gcs.analyticscore.core.GoogleCloudStorageInputStream;
+import com.google.cloud.storage.BlobId;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.function.IntFunction;
+import java.util.stream.Collectors;
+import org.apache.iceberg.gcp.GCPProperties;
+import org.apache.iceberg.io.FileIOMetricsContext;
+import org.apache.iceberg.io.FileRange;
+import org.apache.iceberg.io.RangeReadable;
+import org.apache.iceberg.io.SeekableInputStream;
+import org.apache.iceberg.metrics.Counter;
+import org.apache.iceberg.metrics.MetricsContext;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.util.PropertyUtil;
+
+/**
+ * Gateway to the optional {@code com.google.cloud.gcs.analyticscore.*} 
dependency. All references
+ * to analytics-core types are confined to this class so that it is loaded 
only when {@link
+ * org.apache.iceberg.gcp.GCPProperties#GCS_ANALYTICS_CORE_ENABLED} is true.
+ */
+class AnalyticsCoreUtil {
+
+  private AnalyticsCoreUtil() {}
+
+  static AutoCloseable createFileSystem(Map<String, String> properties, 
Credentials credentials) {
+    Preconditions.checkState(
+        PropertyUtil.propertyAsBoolean(properties, 
GCPProperties.GCS_ANALYTICS_CORE_ENABLED, false),
+        "GCS analytics-core is disabled; %s must be set to true",
+        GCPProperties.GCS_ANALYTICS_CORE_ENABLED);
+    GcsAnalyticsCoreOptions options = new GcsAnalyticsCoreOptions("gcs.", 
properties);
+    GcsFileSystemOptions fileSystemOptions = options.getGcsFileSystemOptions();
+    return credentials == null
+        ? new GcsFileSystemImpl(fileSystemOptions)
+        : new GcsFileSystemImpl(credentials, fileSystemOptions);
+  }
+
+  static SeekableInputStream newStream(
+      AutoCloseable fileSystemHandle, BlobId blobId, Long blobSize, 
MetricsContext metrics)
+      throws IOException {
+    GcsFileSystem fileSystem = (GcsFileSystem) fileSystemHandle;
+    GcsItemId itemId = gcsItemId(blobId);
+    GoogleCloudStorageInputStream stream =
+        blobSize == null
+            ? GoogleCloudStorageInputStream.create(fileSystem, itemId)
+            : GoogleCloudStorageInputStream.create(
+                fileSystem, gcsFileInfo(blobId, itemId, blobSize));
+    return new GcsInputStreamWrapper(stream, blobId, metrics);
+  }
+
+  static void close(AutoCloseable fileSystemHandle) {
+    if (fileSystemHandle != null) {
+      ((GcsFileSystem) fileSystemHandle).close();
+    }
+  }
+
+  private static GcsItemId gcsItemId(BlobId blobId) {
+    GcsItemId.Builder builder =
+        
GcsItemId.builder().setBucketName(blobId.getBucket()).setObjectName(blobId.getName());
+    if (blobId.getGeneration() != null) {
+      builder.setContentGeneration(blobId.getGeneration());
+    }
+
+    return builder.build();
+  }
+
+  private static GcsFileInfo gcsFileInfo(BlobId blobId, GcsItemId itemId, long 
size) {
+    GcsItemInfo itemInfo = 
GcsItemInfo.builder().setItemId(itemId).setSize(size).build();
+    return GcsFileInfo.builder()
+        .setItemInfo(itemInfo)
+        .setUri(URI.create(blobId.toGsUtilUri()))
+        .setAttributes(ImmutableMap.of())
+        .build();
+  }
+
+  private static class GcsInputStreamWrapper extends SeekableInputStream 
implements RangeReadable {
+    private final Counter readBytes;
+    private final Counter readOperations;
+    private final GoogleCloudStorageInputStream stream;
+    private final BlobId blobId;
+
+    GcsInputStreamWrapper(
+        GoogleCloudStorageInputStream stream, BlobId blobId, MetricsContext 
metrics) {
+      Preconditions.checkArgument(null != stream, "Invalid input stream : 
null");
+      Preconditions.checkArgument(null != blobId, "Invalid blobId : null");
+      this.stream = stream;
+      this.blobId = blobId;
+      this.readBytes = metrics.counter(FileIOMetricsContext.READ_BYTES, 
MetricsContext.Unit.BYTES);
+      this.readOperations = 
metrics.counter(FileIOMetricsContext.READ_OPERATIONS);
+    }
+
+    @Override
+    public long getPos() throws IOException {
+      return stream.getPos();
+    }
+
+    @Override
+    public void seek(long newPos) throws IOException {
+      stream.seek(newPos);
+    }
+
+    @Override
+    public int read() throws IOException {
+      int readByte;
+      try {
+        readByte = stream.read();
+      } catch (IOException e) {
+        GCSExceptionUtil.throwNotFoundIfNotPresent(e, blobId);
+        throw e;
+      }
+      readBytes.increment();
+      readOperations.increment();
+      return readByte;
+    }
+
+    @Override
+    public int read(byte[] b) throws IOException {
+      return read(b, 0, b.length);
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+      int bytesRead;
+      try {
+        bytesRead = stream.read(b, off, len);
+      } catch (IOException e) {
+        GCSExceptionUtil.throwNotFoundIfNotPresent(e, blobId);
+        throw e;
+      }
+      if (bytesRead > 0) {
+        readBytes.increment(bytesRead);
+      }
+      readOperations.increment();
+      return bytesRead;
+    }
+
+    @Override
+    public void readFully(long position, byte[] buffer, int offset, int 
length) throws IOException {
+      try {
+        stream.readFully(position, buffer, offset, length);
+      } catch (IOException e) {
+        GCSExceptionUtil.throwNotFoundIfNotPresent(e, blobId);
+        throw e;
+      }
+    }
+
+    @Override
+    public int readTail(byte[] buffer, int offset, int length) throws 
IOException {
+      try {
+        return stream.readTail(buffer, offset, length);
+      } catch (IOException e) {
+        GCSExceptionUtil.throwNotFoundIfNotPresent(e, blobId);
+        throw e;
+      }
+    }
+
+    @Override
+    public void readVectored(List<FileRange> ranges, IntFunction<ByteBuffer> 
allocate)
+        throws IOException {
+      List<GcsObjectRange> objectRanges =
+          ranges.stream()
+              .map(
+                  fileRange ->
+                      GcsObjectRange.builder()
+                          .setOffset(fileRange.offset())
+                          .setLength(fileRange.length())
+                          .setByteBufferFuture(fileRange.byteBuffer())
+                          .build())
+              .collect(Collectors.toList());
+      try {
+        stream.readVectored(objectRanges, allocate);
+      } catch (IOException e) {
+        GCSExceptionUtil.throwNotFoundIfNotPresent(e, blobId);
+        throw e;
+      }
+    }
+
+    @Override
+    public void close() throws IOException {
+      stream.close();
+    }
+  }
+}
diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/BaseGCSFile.java 
b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/BaseGCSFile.java
index 4ece6a2156..2a1440f3b4 100644
--- a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/BaseGCSFile.java
+++ b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/BaseGCSFile.java
@@ -18,7 +18,6 @@
  */
 package org.apache.iceberg.gcp.gcs;
 
-import com.google.cloud.gcs.analyticscore.client.GcsFileSystem;
 import com.google.cloud.storage.Blob;
 import com.google.cloud.storage.BlobId;
 import com.google.cloud.storage.Storage;
@@ -28,7 +27,9 @@ import org.apache.iceberg.metrics.MetricsContext;
 
 abstract class BaseGCSFile {
   private final Storage storage;
-  private final GcsFileSystem gcsFileSystem;
+  // Using AutoCloseable avoids a runtime dependency on gcs-analytics-core. 
Cast via
+  // AnalyticsCoreUtil.
+  private final AutoCloseable gcsFileSystem;
   private final GCPProperties gcpProperties;
   private final BlobId blobId;
   private Blob metadata;
@@ -36,7 +37,7 @@ abstract class BaseGCSFile {
 
   BaseGCSFile(
       Storage storage,
-      GcsFileSystem gcsFileSystem,
+      AutoCloseable gcsFileSystem,
       BlobId blobId,
       GCPProperties gcpProperties,
       MetricsContext metrics) {
@@ -55,7 +56,7 @@ abstract class BaseGCSFile {
     return storage;
   }
 
-  GcsFileSystem gcsFileSystem() {
+  AutoCloseable gcsFileSystem() {
     return gcsFileSystem;
   }
 
diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputFile.java 
b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputFile.java
index 12dc71b5a1..9f1576f63d 100644
--- a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputFile.java
+++ b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputFile.java
@@ -18,20 +18,13 @@
  */
 package org.apache.iceberg.gcp.gcs;
 
-import com.google.cloud.gcs.analyticscore.client.GcsFileInfo;
-import com.google.cloud.gcs.analyticscore.client.GcsFileSystem;
-import com.google.cloud.gcs.analyticscore.client.GcsItemId;
-import com.google.cloud.gcs.analyticscore.client.GcsItemInfo;
-import com.google.cloud.gcs.analyticscore.core.GoogleCloudStorageInputStream;
 import com.google.cloud.storage.BlobId;
 import com.google.cloud.storage.Storage;
 import java.io.IOException;
-import java.net.URI;
 import org.apache.iceberg.gcp.GCPProperties;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.io.SeekableInputStream;
 import org.apache.iceberg.metrics.MetricsContext;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,7 +50,9 @@ class GCSInputFile extends BaseGCSFile implements InputFile {
 
   GCSInputFile(
       Storage storage,
-      GcsFileSystem gcsFileSystem,
+      // Using AutoCloseable avoids a runtime dependency on 
gcs-analytics-core. Cast via
+      // AnalyticsCoreUtil.
+      AutoCloseable gcsFileSystem,
       BlobId blobId,
       Long blobSize,
       GCPProperties gcpProperties,
@@ -79,7 +74,7 @@ class GCSInputFile extends BaseGCSFile implements InputFile {
   public SeekableInputStream newStream() {
     if (gcpProperties().isGcsAnalyticsCoreEnabled()) {
       try {
-        return newGoogleCloudStorageInputStream();
+        return AnalyticsCoreUtil.newStream(gcsFileSystem(), blobId(), 
blobSize, metrics());
       } catch (IOException e) {
         LOG.error(
             "Failed to create GCS analytics core input stream for {}, falling 
back to default.",
@@ -90,35 +85,4 @@ class GCSInputFile extends BaseGCSFile implements InputFile {
 
     return new GCSInputStream(storage(), blobId(), blobSize, gcpProperties(), 
metrics());
   }
-
-  private SeekableInputStream newGoogleCloudStorageInputStream() throws 
IOException {
-    if (null == blobSize) {
-      return new GcsInputStreamWrapper(
-          GoogleCloudStorageInputStream.create(gcsFileSystem(), gcsItemId()), 
blobId(), metrics());
-    }
-
-    return new GcsInputStreamWrapper(
-        GoogleCloudStorageInputStream.create(gcsFileSystem(), gcsFileInfo()), 
blobId(), metrics());
-  }
-
-  private GcsItemId gcsItemId() {
-    BlobId blobId = blobId();
-    GcsItemId.Builder builder =
-        
GcsItemId.builder().setBucketName(blobId.getBucket()).setObjectName(blobId.getName());
-    if (blobId.getGeneration() != null) {
-      builder.setContentGeneration(blobId.getGeneration());
-    }
-
-    return builder.build();
-  }
-
-  private GcsFileInfo gcsFileInfo() {
-    GcsItemId itemId = gcsItemId();
-    GcsItemInfo itemInfo = 
GcsItemInfo.builder().setItemId(itemId).setSize(getLength()).build();
-    return GcsFileInfo.builder()
-        .setItemInfo(itemInfo)
-        .setUri(URI.create(blobId().toGsUtilUri()))
-        .setAttributes(ImmutableMap.of())
-        .build();
-  }
 }
diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSOutputFile.java 
b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSOutputFile.java
index aa9f038db7..18bf53b791 100644
--- a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSOutputFile.java
+++ b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSOutputFile.java
@@ -18,7 +18,6 @@
  */
 package org.apache.iceberg.gcp.gcs;
 
-import com.google.cloud.gcs.analyticscore.client.GcsFileSystem;
 import com.google.cloud.storage.BlobId;
 import com.google.cloud.storage.Storage;
 import java.io.IOException;
@@ -44,7 +43,9 @@ class GCSOutputFile extends BaseGCSFile implements OutputFile 
{
 
   GCSOutputFile(
       Storage storage,
-      GcsFileSystem gcsFileSystem,
+      // Using AutoCloseable avoids a runtime dependency on 
gcs-analytics-core. Cast via
+      // AnalyticsCoreUtil.
+      AutoCloseable gcsFileSystem,
       BlobId blobId,
       GCPProperties gcpProperties,
       MetricsContext metrics) {
diff --git 
a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GcsInputStreamWrapper.java 
b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GcsInputStreamWrapper.java
deleted file mode 100644
index 25ba7662dd..0000000000
--- a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GcsInputStreamWrapper.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iceberg.gcp.gcs;
-
-import com.google.api.client.util.Preconditions;
-import com.google.cloud.gcs.analyticscore.client.GcsObjectRange;
-import com.google.cloud.gcs.analyticscore.core.GoogleCloudStorageInputStream;
-import com.google.cloud.storage.BlobId;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.function.IntFunction;
-import java.util.stream.Collectors;
-import org.apache.iceberg.io.FileIOMetricsContext;
-import org.apache.iceberg.io.FileRange;
-import org.apache.iceberg.io.RangeReadable;
-import org.apache.iceberg.io.SeekableInputStream;
-import org.apache.iceberg.metrics.Counter;
-import org.apache.iceberg.metrics.MetricsContext;
-
-class GcsInputStreamWrapper extends SeekableInputStream implements 
RangeReadable {
-  private final Counter readBytes;
-  private final Counter readOperations;
-  private final GoogleCloudStorageInputStream stream;
-  private final BlobId blobId;
-
-  GcsInputStreamWrapper(
-      GoogleCloudStorageInputStream stream, BlobId blobId, MetricsContext 
metrics) {
-    Preconditions.checkArgument(null != stream, "Invalid input stream : null");
-    Preconditions.checkArgument(null != blobId, "Invalid blobId : null");
-    this.stream = stream;
-    this.blobId = blobId;
-    this.readBytes = metrics.counter(FileIOMetricsContext.READ_BYTES, 
MetricsContext.Unit.BYTES);
-    this.readOperations = 
metrics.counter(FileIOMetricsContext.READ_OPERATIONS);
-  }
-
-  @Override
-  public long getPos() throws IOException {
-    return stream.getPos();
-  }
-
-  @Override
-  public void seek(long newPos) throws IOException {
-    stream.seek(newPos);
-  }
-
-  @Override
-  public int read() throws IOException {
-    int readByte;
-    try {
-      readByte = stream.read();
-    } catch (IOException e) {
-      GCSExceptionUtil.throwNotFoundIfNotPresent(e, blobId);
-      throw e;
-    }
-    readBytes.increment();
-    readOperations.increment();
-    return readByte;
-  }
-
-  @Override
-  public int read(byte[] b) throws IOException {
-    return read(b, 0, b.length);
-  }
-
-  @Override
-  public int read(byte[] b, int off, int len) throws IOException {
-    int bytesRead;
-    try {
-      bytesRead = stream.read(b, off, len);
-    } catch (IOException e) {
-      GCSExceptionUtil.throwNotFoundIfNotPresent(e, blobId);
-      throw e;
-    }
-    if (bytesRead > 0) {
-      readBytes.increment(bytesRead);
-    }
-    readOperations.increment();
-    return bytesRead;
-  }
-
-  @Override
-  public void readFully(long position, byte[] buffer, int offset, int length) 
throws IOException {
-    try {
-      stream.readFully(position, buffer, offset, length);
-    } catch (IOException e) {
-      GCSExceptionUtil.throwNotFoundIfNotPresent(e, blobId);
-      throw e;
-    }
-  }
-
-  @Override
-  public int readTail(byte[] buffer, int offset, int length) throws 
IOException {
-    try {
-      return stream.readTail(buffer, offset, length);
-    } catch (IOException e) {
-      GCSExceptionUtil.throwNotFoundIfNotPresent(e, blobId);
-      throw e;
-    }
-  }
-
-  @Override
-  public void readVectored(List<FileRange> ranges, IntFunction<ByteBuffer> 
allocate)
-      throws IOException {
-    List<GcsObjectRange> objectRanges =
-        ranges.stream()
-            .map(
-                fileRange ->
-                    GcsObjectRange.builder()
-                        .setOffset(fileRange.offset())
-                        .setLength(fileRange.length())
-                        .setByteBufferFuture(fileRange.byteBuffer())
-                        .build())
-            .collect(Collectors.toList());
-    try {
-      stream.readVectored(objectRanges, allocate);
-    } catch (IOException e) {
-      GCSExceptionUtil.throwNotFoundIfNotPresent(e, blobId);
-      throw e;
-    }
-  }
-
-  @Override
-  public void close() throws IOException {
-    stream.close();
-  }
-}
diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/PrefixedStorage.java 
b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/PrefixedStorage.java
index 06017bb2e8..a442269c09 100644
--- a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/PrefixedStorage.java
+++ b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/PrefixedStorage.java
@@ -23,10 +23,6 @@ import com.google.auth.Credentials;
 import com.google.auth.oauth2.GoogleCredentials;
 import com.google.auth.oauth2.ImpersonatedCredentials;
 import com.google.cloud.NoCredentials;
-import com.google.cloud.gcs.analyticscore.client.GcsFileSystem;
-import com.google.cloud.gcs.analyticscore.client.GcsFileSystemImpl;
-import com.google.cloud.gcs.analyticscore.client.GcsFileSystemOptions;
-import com.google.cloud.gcs.analyticscore.core.GcsAnalyticsCoreOptions;
 import com.google.cloud.storage.Storage;
 import com.google.cloud.storage.StorageOptions;
 import java.io.IOException;
@@ -45,11 +41,11 @@ class PrefixedStorage implements AutoCloseable {
   private static final String GCS_FILE_IO_USER_AGENT = "gcsfileio/" + 
EnvironmentContext.get();
   private final String storagePrefix;
   private final GCPProperties gcpProperties;
+  private final Map<String, String> propertiesWithUserAgent;
   private SerializableSupplier<Storage> storage;
   private CloseableGroup closeableGroup;
   private transient volatile Storage storageClient;
-  private final SerializableSupplier<GcsFileSystem> gcsFileSystemSupplier;
-  private transient volatile GcsFileSystem gcsFileSystem;
+  private transient volatile AutoCloseable gcsFileSystem;
 
   PrefixedStorage(
       String storagePrefix, Map<String, String> properties, 
SerializableSupplier<Storage> storage) {
@@ -59,6 +55,11 @@ class PrefixedStorage implements AutoCloseable {
     this.storagePrefix = storagePrefix;
     this.storage = storage;
     this.gcpProperties = new GCPProperties(properties);
+    this.propertiesWithUserAgent =
+        ImmutableMap.<String, String>builder()
+            .putAll(properties)
+            .put("gcs.user-agent", GCS_FILE_IO_USER_AGENT)
+            .build();
     this.closeableGroup = new CloseableGroup();
     if (null == storage) {
       this.storage =
@@ -81,8 +82,6 @@ class PrefixedStorage implements AutoCloseable {
             return builder.build().getService();
           };
     }
-
-    this.gcsFileSystemSupplier = gcsFileSystemSupplier(properties);
   }
 
   public String storagePrefix() {
@@ -107,31 +106,45 @@ class PrefixedStorage implements AutoCloseable {
 
   @Override
   public void close() {
-    if (null != closeableGroup) {
+    try {
       try {
-        closeableGroup.close();
+        if (null != closeableGroup) {
+          closeableGroup.close();
+        }
       } catch (IOException ioe) {
         throw new UncheckedIOException(ioe);
+      } finally {
+        if (null != gcsFileSystem) {
+          AnalyticsCoreUtil.close(gcsFileSystem);
+          gcsFileSystem = null;
+        }
+      }
+    } finally {
+      if (null != storage) {
+        // GCS Storage does not appear to be closable, so release the reference
+        storage = null;
       }
     }
+  }
 
-    if (null != storage) {
-      // GCS Storage does not appear to be closable, so release the reference
-      storage = null;
+  // Returns AutoCloseable to avoid a runtime dependency on 
gcs-analytics-core. Cast via
+  // AnalyticsCoreUtil.
+  AutoCloseable gcsFileSystem() {
+    if (!gcpProperties.isGcsAnalyticsCoreEnabled()) {
+      return null;
     }
-  }
 
-  GcsFileSystem gcsFileSystem() {
     if (gcsFileSystem == null) {
       synchronized (this) {
         if (gcsFileSystem == null) {
-          this.gcsFileSystem = gcsFileSystemSupplier.get();
-          this.closeableGroup.addCloseable(gcsFileSystem);
+          this.gcsFileSystem =
+              AnalyticsCoreUtil.createFileSystem(
+                  propertiesWithUserAgent, credentials(gcpProperties));
         }
       }
     }
 
-    return this.gcsFileSystem;
+    return gcsFileSystem;
   }
 
   private Credentials credentials(GCPProperties properties) {
@@ -169,20 +182,4 @@ class PrefixedStorage implements AutoCloseable {
       throw new UncheckedIOException("Failed to create impersonated 
credentials for GCS", e);
     }
   }
-
-  private SerializableSupplier<GcsFileSystem> gcsFileSystemSupplier(
-      Map<String, String> properties) {
-    ImmutableMap.Builder<String, String> propertiesWithUserAgent =
-        new ImmutableMap.Builder<String, String>()
-            .putAll(properties)
-            .put("gcs.user-agent", GCS_FILE_IO_USER_AGENT);
-    GcsAnalyticsCoreOptions gcsAnalyticsCoreOptions =
-        new GcsAnalyticsCoreOptions("gcs.", propertiesWithUserAgent.build());
-    GcsFileSystemOptions fileSystemOptions = 
gcsAnalyticsCoreOptions.getGcsFileSystemOptions();
-    Credentials credentials = credentials(new GCPProperties(properties));
-    return () ->
-        credentials == null
-            ? new GcsFileSystemImpl(fileSystemOptions)
-            : new GcsFileSystemImpl(credentials, fileSystemOptions);
-  }
 }
diff --git 
a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/TestAnalyticsCoreUtil.java 
b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/TestAnalyticsCoreUtil.java
new file mode 100644
index 0000000000..d915553453
--- /dev/null
+++ b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/TestAnalyticsCoreUtil.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.gcp.gcs;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.verify;
+
+import com.google.cloud.gcs.analyticscore.client.GcsFileSystem;
+import com.google.cloud.gcs.analyticscore.client.GcsItemId;
+import com.google.cloud.gcs.analyticscore.client.GcsObjectRange;
+import com.google.cloud.gcs.analyticscore.core.GoogleCloudStorageInputStream;
+import com.google.cloud.storage.BlobId;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.IntFunction;
+import org.apache.iceberg.io.FileRange;
+import org.apache.iceberg.io.RangeReadable;
+import org.apache.iceberg.io.SeekableInputStream;
+import org.apache.iceberg.metrics.MetricsContext;
+import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+
+public class TestAnalyticsCoreUtil {
+
+  @Test
+  public void readVectored() throws IOException {
+    GcsFileSystem fileSystem = mock(GcsFileSystem.class);
+    GoogleCloudStorageInputStream gcsInputStream = 
mock(GoogleCloudStorageInputStream.class);
+    BlobId blobId = BlobId.of("mockbucket", "mockname");
+
+    SeekableInputStream stream;
+    try (MockedStatic<GoogleCloudStorageInputStream> mocked =
+        mockStatic(GoogleCloudStorageInputStream.class)) {
+      mocked
+          .when(() -> GoogleCloudStorageInputStream.create(eq(fileSystem), 
any(GcsItemId.class)))
+          .thenReturn(gcsInputStream);
+      stream = AnalyticsCoreUtil.newStream(fileSystem, blobId, null, 
MetricsContext.nullMetrics());
+    }
+
+    CompletableFuture<ByteBuffer> future1 = new CompletableFuture<>();
+    CompletableFuture<ByteBuffer> future2 = new CompletableFuture<>();
+    List<FileRange> ranges =
+        List.of(new FileRange(future1, 10L, 100), new FileRange(future2, 0, 
50));
+    IntFunction<ByteBuffer> allocate = ByteBuffer::allocate;
+
+    ((RangeReadable) stream).readVectored(ranges, allocate);
+
+    List<GcsObjectRange> objectRanges =
+        List.of(
+            GcsObjectRange.builder()
+                .setOffset(10)
+                .setLength(100)
+                .setByteBufferFuture(future1)
+                .build(),
+            GcsObjectRange.builder()
+                .setOffset(0)
+                .setLength(50)
+                .setByteBufferFuture(future2)
+                .build());
+    verify(gcsInputStream).readVectored(objectRanges, allocate);
+  }
+}
diff --git a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/TestGcsInputFile.java 
b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/TestGcsInputFile.java
index e58e814bd8..e3d295805f 100644
--- a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/TestGcsInputFile.java
+++ b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/TestGcsInputFile.java
@@ -158,8 +158,9 @@ public class TestGcsInputFile {
               metricsContext);
 
       try (SeekableInputStream stream = inputFile.newStream()) {
-        assertThat(stream).isInstanceOf(GcsInputStreamWrapper.class);
+        assertThat(stream).isNotInstanceOf(GCSInputStream.class);
       }
+      mocked.verify(() -> GoogleCloudStorageInputStream.create(gcsFileSystem, 
gcsFileInfo));
     }
   }
 
@@ -189,8 +190,9 @@ public class TestGcsInputFile {
               metricsContext);
 
       try (SeekableInputStream stream = inputFile.newStream()) {
-        assertThat(stream).isInstanceOf(GcsInputStreamWrapper.class);
+        assertThat(stream).isNotInstanceOf(GCSInputStream.class);
       }
+      mocked.verify(() -> GoogleCloudStorageInputStream.create(gcsFileSystem, 
itemId));
     }
   }
 
diff --git 
a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/TestGcsInputStreamWrapper.java 
b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/TestGcsInputStreamWrapper.java
deleted file mode 100644
index c6eae113d5..0000000000
--- 
a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/TestGcsInputStreamWrapper.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iceberg.gcp.gcs;
-
-import com.google.cloud.gcs.analyticscore.client.GcsObjectRange;
-import com.google.cloud.gcs.analyticscore.core.GoogleCloudStorageInputStream;
-import com.google.cloud.storage.BlobId;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.function.IntFunction;
-import org.apache.iceberg.io.FileRange;
-import org.apache.iceberg.metrics.MetricsContext;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.junit.jupiter.MockitoExtension;
-
-@ExtendWith(MockitoExtension.class)
-public class TestGcsInputStreamWrapper {
-
-  @Mock private GoogleCloudStorageInputStream googleCloudStorageInputStream;
-
-  private GcsInputStreamWrapper inputStreamWrapper;
-
-  @BeforeEach
-  public void before() {
-    inputStreamWrapper =
-        new GcsInputStreamWrapper(
-            googleCloudStorageInputStream,
-            BlobId.of("mockbucket", "mockname"),
-            MetricsContext.nullMetrics());
-  }
-
-  @Test
-  public void getPos() throws IOException {
-    inputStreamWrapper.getPos();
-
-    Mockito.verify(googleCloudStorageInputStream).getPos();
-  }
-
-  @Test
-  public void seek() throws IOException {
-    long newPos = 1234L;
-    inputStreamWrapper.seek(newPos);
-
-    Mockito.verify(googleCloudStorageInputStream).seek(newPos);
-  }
-
-  @Test
-  public void read() throws IOException {
-    inputStreamWrapper.read();
-
-    Mockito.verify(googleCloudStorageInputStream).read();
-  }
-
-  @Test
-  public void readByteArray() throws IOException {
-    byte[] buffer = new byte[1024];
-
-    inputStreamWrapper.read(buffer);
-
-    Mockito.verify(googleCloudStorageInputStream).read(buffer, 0, 
buffer.length);
-  }
-
-  @Test
-  public void readByteArrayWithOffset() throws IOException {
-    byte[] buffer = new byte[1024];
-    int off = 10;
-    int len = 100;
-
-    inputStreamWrapper.read(buffer, off, len);
-
-    Mockito.verify(googleCloudStorageInputStream).read(buffer, off, len);
-  }
-
-  @Test
-  public void readFully() throws IOException {
-    long position = 123L;
-    byte[] buffer = new byte[1024];
-    int offset = 10;
-    int length = 100;
-
-    inputStreamWrapper.readFully(position, buffer, offset, length);
-
-    Mockito.verify(googleCloudStorageInputStream).readFully(position, buffer, 
offset, length);
-  }
-
-  @Test
-  public void readTail() throws IOException {
-    byte[] buffer = new byte[1024];
-    int offset = 10;
-    int length = 100;
-
-    inputStreamWrapper.readTail(buffer, offset, length);
-
-    Mockito.verify(googleCloudStorageInputStream).readTail(buffer, offset, 
length);
-  }
-
-  @Test
-  public void readVectored() throws IOException {
-    CompletableFuture<ByteBuffer> future1 = new CompletableFuture<>();
-    CompletableFuture<ByteBuffer> future2 = new CompletableFuture<>();
-    List<FileRange> ranges =
-        List.of(new FileRange(future1, 10L, 100), new FileRange(future2, 0, 
50));
-    IntFunction<ByteBuffer> allocate = ByteBuffer::allocate;
-
-    inputStreamWrapper.readVectored(ranges, allocate);
-    List<GcsObjectRange> objectRanges =
-        List.of(
-            GcsObjectRange.builder()
-                .setOffset(10)
-                .setLength(100)
-                .setByteBufferFuture(future1)
-                .build(),
-            GcsObjectRange.builder()
-                .setOffset(0)
-                .setLength(50)
-                .setByteBufferFuture(future2)
-                .build());
-
-    Mockito.verify(googleCloudStorageInputStream).readVectored(objectRanges, 
allocate);
-  }
-
-  @Test
-  public void close() throws IOException {
-    inputStreamWrapper.close();
-
-    Mockito.verify(googleCloudStorageInputStream).close();
-  }
-}
diff --git 
a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/TestPrefixedStorage.java 
b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/TestPrefixedStorage.java
index dd68b03766..0a06fcdd0c 100644
--- a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/TestPrefixedStorage.java
+++ b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/TestPrefixedStorage.java
@@ -115,17 +115,27 @@ public class TestPrefixedStorage {
         .isEqualTo(GCPProperties.GCS_IMPERSONATE_LIFETIME_SECONDS_DEFAULT);
   }
 
+  @Test
+  public void gcsFileSystemDisabledByDefault() {
+    Map<String, String> properties = 
ImmutableMap.of(GCPProperties.GCS_PROJECT_ID, "myProject");
+    PrefixedStorage storage = new PrefixedStorage("gs://bucket", properties, 
null);
+
+    assertThat(storage.gcsFileSystem()).isNull();
+  }
+
   @Test
   public void gcsFileSystem() {
     Map<String, String> properties =
-        ImmutableMap.of(
-            GCPProperties.GCS_PROJECT_ID, "myProject",
-            GCPProperties.GCS_USER_PROJECT, "userProject",
-            GCPProperties.GCS_CLIENT_LIB_TOKEN, "gccl",
-            GCPProperties.GCS_SERVICE_HOST, "example.com",
-            GCPProperties.GCS_DECRYPTION_KEY, "decryptionKey",
-            GCPProperties.GCS_ENCRYPTION_KEY, "encryptionKey",
-            GCPProperties.GCS_CHANNEL_READ_CHUNK_SIZE, "1024");
+        ImmutableMap.<String, String>builder()
+            .put(GCPProperties.GCS_ANALYTICS_CORE_ENABLED, "true")
+            .put(GCPProperties.GCS_PROJECT_ID, "myProject")
+            .put(GCPProperties.GCS_USER_PROJECT, "userProject")
+            .put(GCPProperties.GCS_CLIENT_LIB_TOKEN, "gccl")
+            .put(GCPProperties.GCS_SERVICE_HOST, "example.com")
+            .put(GCPProperties.GCS_DECRYPTION_KEY, "decryptionKey")
+            .put(GCPProperties.GCS_ENCRYPTION_KEY, "encryptionKey")
+            .put(GCPProperties.GCS_CHANNEL_READ_CHUNK_SIZE, "1024")
+            .build();
     PrefixedStorage storage = new PrefixedStorage("gs://bucket", properties, 
null);
     GcsFileSystemOptions expectedOptions =
         GcsFileSystemOptions.builder()
@@ -144,7 +154,7 @@ public class TestPrefixedStorage {
                     .build())
             .build();
 
-    GcsFileSystem fileSystem = storage.gcsFileSystem();
+    GcsFileSystem fileSystem = (GcsFileSystem) storage.gcsFileSystem();
 
     assertThat(fileSystem).isNotNull();
     assertThat(fileSystem.getGcsClient()).isNotNull();


Reply via email to