This is an automated email from the ASF dual-hosted git repository.
kevinjqliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 42f3d01e6c Move all analyticscore references behind a
AnalyticsCoreUtil class (#16258)
42f3d01e6c is described below
commit 42f3d01e6ceaf84ae2810bf902d9cb7bcc4c4574
Author: Talat UYARER <[email protected]>
AuthorDate: Tue May 12 13:00:41 2026 -0700
Move all analyticscore references behind a AnalyticsCoreUtil class (#16258)
---
.../org/apache/iceberg/gcp/gcs/TestGcsFileIO.java | 2 +-
.../apache/iceberg/gcp/gcs/AnalyticsCoreUtil.java | 216 +++++++++++++++++++++
.../org/apache/iceberg/gcp/gcs/BaseGCSFile.java | 9 +-
.../org/apache/iceberg/gcp/gcs/GCSInputFile.java | 44 +----
.../org/apache/iceberg/gcp/gcs/GCSOutputFile.java | 5 +-
.../iceberg/gcp/gcs/GcsInputStreamWrapper.java | 143 --------------
.../apache/iceberg/gcp/gcs/PrefixedStorage.java | 65 +++----
.../iceberg/gcp/gcs/TestAnalyticsCoreUtil.java | 83 ++++++++
.../apache/iceberg/gcp/gcs/TestGcsInputFile.java | 6 +-
.../iceberg/gcp/gcs/TestGcsInputStreamWrapper.java | 150 --------------
.../iceberg/gcp/gcs/TestPrefixedStorage.java | 28 ++-
11 files changed, 366 insertions(+), 385 deletions(-)
diff --git
a/gcp/src/integration/java/org/apache/iceberg/gcp/gcs/TestGcsFileIO.java
b/gcp/src/integration/java/org/apache/iceberg/gcp/gcs/TestGcsFileIO.java
index 626aacd17d..1e956854b3 100644
--- a/gcp/src/integration/java/org/apache/iceberg/gcp/gcs/TestGcsFileIO.java
+++ b/gcp/src/integration/java/org/apache/iceberg/gcp/gcs/TestGcsFileIO.java
@@ -163,7 +163,7 @@ public class TestGcsFileIO {
IOUtil.readFully(is, actual, 0, expected.length);
}
- assertThat(inputStream).isInstanceOf(GcsInputStreamWrapper.class);
+ assertThat(inputStream).isNotInstanceOf(GCSInputStream.class);
assertThat(actual).isEqualTo(expected);
}
diff --git
a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/AnalyticsCoreUtil.java
b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/AnalyticsCoreUtil.java
new file mode 100644
index 0000000000..4bc9659d77
--- /dev/null
+++ b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/AnalyticsCoreUtil.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.gcp.gcs;
+
+import com.google.auth.Credentials;
+import com.google.cloud.gcs.analyticscore.client.GcsFileInfo;
+import com.google.cloud.gcs.analyticscore.client.GcsFileSystem;
+import com.google.cloud.gcs.analyticscore.client.GcsFileSystemImpl;
+import com.google.cloud.gcs.analyticscore.client.GcsFileSystemOptions;
+import com.google.cloud.gcs.analyticscore.client.GcsItemId;
+import com.google.cloud.gcs.analyticscore.client.GcsItemInfo;
+import com.google.cloud.gcs.analyticscore.client.GcsObjectRange;
+import com.google.cloud.gcs.analyticscore.core.GcsAnalyticsCoreOptions;
+import com.google.cloud.gcs.analyticscore.core.GoogleCloudStorageInputStream;
+import com.google.cloud.storage.BlobId;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.function.IntFunction;
+import java.util.stream.Collectors;
+import org.apache.iceberg.gcp.GCPProperties;
+import org.apache.iceberg.io.FileIOMetricsContext;
+import org.apache.iceberg.io.FileRange;
+import org.apache.iceberg.io.RangeReadable;
+import org.apache.iceberg.io.SeekableInputStream;
+import org.apache.iceberg.metrics.Counter;
+import org.apache.iceberg.metrics.MetricsContext;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.util.PropertyUtil;
+
+/**
+ * Gateway to the optional {@code com.google.cloud.gcs.analyticscore.*}
dependency. All references
+ * to analytics-core types are confined to this class so that it is loaded
only when {@link
+ * org.apache.iceberg.gcp.GCPProperties#GCS_ANALYTICS_CORE_ENABLED} is true.
+ */
+class AnalyticsCoreUtil {
+
+ private AnalyticsCoreUtil() {}
+
+ static AutoCloseable createFileSystem(Map<String, String> properties,
Credentials credentials) {
+ Preconditions.checkState(
+ PropertyUtil.propertyAsBoolean(properties,
GCPProperties.GCS_ANALYTICS_CORE_ENABLED, false),
+ "GCS analytics-core is disabled; %s must be set to true",
+ GCPProperties.GCS_ANALYTICS_CORE_ENABLED);
+ GcsAnalyticsCoreOptions options = new GcsAnalyticsCoreOptions("gcs.",
properties);
+ GcsFileSystemOptions fileSystemOptions = options.getGcsFileSystemOptions();
+ return credentials == null
+ ? new GcsFileSystemImpl(fileSystemOptions)
+ : new GcsFileSystemImpl(credentials, fileSystemOptions);
+ }
+
+ static SeekableInputStream newStream(
+ AutoCloseable fileSystemHandle, BlobId blobId, Long blobSize,
MetricsContext metrics)
+ throws IOException {
+ GcsFileSystem fileSystem = (GcsFileSystem) fileSystemHandle;
+ GcsItemId itemId = gcsItemId(blobId);
+ GoogleCloudStorageInputStream stream =
+ blobSize == null
+ ? GoogleCloudStorageInputStream.create(fileSystem, itemId)
+ : GoogleCloudStorageInputStream.create(
+ fileSystem, gcsFileInfo(blobId, itemId, blobSize));
+ return new GcsInputStreamWrapper(stream, blobId, metrics);
+ }
+
+ static void close(AutoCloseable fileSystemHandle) {
+ if (fileSystemHandle != null) {
+ ((GcsFileSystem) fileSystemHandle).close();
+ }
+ }
+
+ private static GcsItemId gcsItemId(BlobId blobId) {
+ GcsItemId.Builder builder =
+
GcsItemId.builder().setBucketName(blobId.getBucket()).setObjectName(blobId.getName());
+ if (blobId.getGeneration() != null) {
+ builder.setContentGeneration(blobId.getGeneration());
+ }
+
+ return builder.build();
+ }
+
+ private static GcsFileInfo gcsFileInfo(BlobId blobId, GcsItemId itemId, long
size) {
+ GcsItemInfo itemInfo =
GcsItemInfo.builder().setItemId(itemId).setSize(size).build();
+ return GcsFileInfo.builder()
+ .setItemInfo(itemInfo)
+ .setUri(URI.create(blobId.toGsUtilUri()))
+ .setAttributes(ImmutableMap.of())
+ .build();
+ }
+
+ private static class GcsInputStreamWrapper extends SeekableInputStream
implements RangeReadable {
+ private final Counter readBytes;
+ private final Counter readOperations;
+ private final GoogleCloudStorageInputStream stream;
+ private final BlobId blobId;
+
+ GcsInputStreamWrapper(
+ GoogleCloudStorageInputStream stream, BlobId blobId, MetricsContext
metrics) {
+ Preconditions.checkArgument(null != stream, "Invalid input stream :
null");
+ Preconditions.checkArgument(null != blobId, "Invalid blobId : null");
+ this.stream = stream;
+ this.blobId = blobId;
+ this.readBytes = metrics.counter(FileIOMetricsContext.READ_BYTES,
MetricsContext.Unit.BYTES);
+ this.readOperations =
metrics.counter(FileIOMetricsContext.READ_OPERATIONS);
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return stream.getPos();
+ }
+
+ @Override
+ public void seek(long newPos) throws IOException {
+ stream.seek(newPos);
+ }
+
+ @Override
+ public int read() throws IOException {
+ int readByte;
+ try {
+ readByte = stream.read();
+ } catch (IOException e) {
+ GCSExceptionUtil.throwNotFoundIfNotPresent(e, blobId);
+ throw e;
+ }
+ readBytes.increment();
+ readOperations.increment();
+ return readByte;
+ }
+
+ @Override
+ public int read(byte[] b) throws IOException {
+ return read(b, 0, b.length);
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ int bytesRead;
+ try {
+ bytesRead = stream.read(b, off, len);
+ } catch (IOException e) {
+ GCSExceptionUtil.throwNotFoundIfNotPresent(e, blobId);
+ throw e;
+ }
+ if (bytesRead > 0) {
+ readBytes.increment(bytesRead);
+ }
+ readOperations.increment();
+ return bytesRead;
+ }
+
+ @Override
+ public void readFully(long position, byte[] buffer, int offset, int
length) throws IOException {
+ try {
+ stream.readFully(position, buffer, offset, length);
+ } catch (IOException e) {
+ GCSExceptionUtil.throwNotFoundIfNotPresent(e, blobId);
+ throw e;
+ }
+ }
+
+ @Override
+ public int readTail(byte[] buffer, int offset, int length) throws
IOException {
+ try {
+ return stream.readTail(buffer, offset, length);
+ } catch (IOException e) {
+ GCSExceptionUtil.throwNotFoundIfNotPresent(e, blobId);
+ throw e;
+ }
+ }
+
+ @Override
+ public void readVectored(List<FileRange> ranges, IntFunction<ByteBuffer>
allocate)
+ throws IOException {
+ List<GcsObjectRange> objectRanges =
+ ranges.stream()
+ .map(
+ fileRange ->
+ GcsObjectRange.builder()
+ .setOffset(fileRange.offset())
+ .setLength(fileRange.length())
+ .setByteBufferFuture(fileRange.byteBuffer())
+ .build())
+ .collect(Collectors.toList());
+ try {
+ stream.readVectored(objectRanges, allocate);
+ } catch (IOException e) {
+ GCSExceptionUtil.throwNotFoundIfNotPresent(e, blobId);
+ throw e;
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ stream.close();
+ }
+ }
+}
diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/BaseGCSFile.java
b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/BaseGCSFile.java
index 4ece6a2156..2a1440f3b4 100644
--- a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/BaseGCSFile.java
+++ b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/BaseGCSFile.java
@@ -18,7 +18,6 @@
*/
package org.apache.iceberg.gcp.gcs;
-import com.google.cloud.gcs.analyticscore.client.GcsFileSystem;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.Storage;
@@ -28,7 +27,9 @@ import org.apache.iceberg.metrics.MetricsContext;
abstract class BaseGCSFile {
private final Storage storage;
- private final GcsFileSystem gcsFileSystem;
+ // Using AutoCloseable avoids a runtime dependency on gcs-analytics-core.
Cast via
+ // AnalyticsCoreUtil.
+ private final AutoCloseable gcsFileSystem;
private final GCPProperties gcpProperties;
private final BlobId blobId;
private Blob metadata;
@@ -36,7 +37,7 @@ abstract class BaseGCSFile {
BaseGCSFile(
Storage storage,
- GcsFileSystem gcsFileSystem,
+ AutoCloseable gcsFileSystem,
BlobId blobId,
GCPProperties gcpProperties,
MetricsContext metrics) {
@@ -55,7 +56,7 @@ abstract class BaseGCSFile {
return storage;
}
- GcsFileSystem gcsFileSystem() {
+ AutoCloseable gcsFileSystem() {
return gcsFileSystem;
}
diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputFile.java
b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputFile.java
index 12dc71b5a1..9f1576f63d 100644
--- a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputFile.java
+++ b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputFile.java
@@ -18,20 +18,13 @@
*/
package org.apache.iceberg.gcp.gcs;
-import com.google.cloud.gcs.analyticscore.client.GcsFileInfo;
-import com.google.cloud.gcs.analyticscore.client.GcsFileSystem;
-import com.google.cloud.gcs.analyticscore.client.GcsItemId;
-import com.google.cloud.gcs.analyticscore.client.GcsItemInfo;
-import com.google.cloud.gcs.analyticscore.core.GoogleCloudStorageInputStream;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.Storage;
import java.io.IOException;
-import java.net.URI;
import org.apache.iceberg.gcp.GCPProperties;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.SeekableInputStream;
import org.apache.iceberg.metrics.MetricsContext;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,7 +50,9 @@ class GCSInputFile extends BaseGCSFile implements InputFile {
GCSInputFile(
Storage storage,
- GcsFileSystem gcsFileSystem,
+ // Using AutoCloseable avoids a runtime dependency on
gcs-analytics-core. Cast via
+ // AnalyticsCoreUtil.
+ AutoCloseable gcsFileSystem,
BlobId blobId,
Long blobSize,
GCPProperties gcpProperties,
@@ -79,7 +74,7 @@ class GCSInputFile extends BaseGCSFile implements InputFile {
public SeekableInputStream newStream() {
if (gcpProperties().isGcsAnalyticsCoreEnabled()) {
try {
- return newGoogleCloudStorageInputStream();
+ return AnalyticsCoreUtil.newStream(gcsFileSystem(), blobId(),
blobSize, metrics());
} catch (IOException e) {
LOG.error(
"Failed to create GCS analytics core input stream for {}, falling
back to default.",
@@ -90,35 +85,4 @@ class GCSInputFile extends BaseGCSFile implements InputFile {
return new GCSInputStream(storage(), blobId(), blobSize, gcpProperties(),
metrics());
}
-
- private SeekableInputStream newGoogleCloudStorageInputStream() throws
IOException {
- if (null == blobSize) {
- return new GcsInputStreamWrapper(
- GoogleCloudStorageInputStream.create(gcsFileSystem(), gcsItemId()),
blobId(), metrics());
- }
-
- return new GcsInputStreamWrapper(
- GoogleCloudStorageInputStream.create(gcsFileSystem(), gcsFileInfo()),
blobId(), metrics());
- }
-
- private GcsItemId gcsItemId() {
- BlobId blobId = blobId();
- GcsItemId.Builder builder =
-
GcsItemId.builder().setBucketName(blobId.getBucket()).setObjectName(blobId.getName());
- if (blobId.getGeneration() != null) {
- builder.setContentGeneration(blobId.getGeneration());
- }
-
- return builder.build();
- }
-
- private GcsFileInfo gcsFileInfo() {
- GcsItemId itemId = gcsItemId();
- GcsItemInfo itemInfo =
GcsItemInfo.builder().setItemId(itemId).setSize(getLength()).build();
- return GcsFileInfo.builder()
- .setItemInfo(itemInfo)
- .setUri(URI.create(blobId().toGsUtilUri()))
- .setAttributes(ImmutableMap.of())
- .build();
- }
}
diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSOutputFile.java
b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSOutputFile.java
index aa9f038db7..18bf53b791 100644
--- a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSOutputFile.java
+++ b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSOutputFile.java
@@ -18,7 +18,6 @@
*/
package org.apache.iceberg.gcp.gcs;
-import com.google.cloud.gcs.analyticscore.client.GcsFileSystem;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.Storage;
import java.io.IOException;
@@ -44,7 +43,9 @@ class GCSOutputFile extends BaseGCSFile implements OutputFile
{
GCSOutputFile(
Storage storage,
- GcsFileSystem gcsFileSystem,
+ // Using AutoCloseable avoids a runtime dependency on
gcs-analytics-core. Cast via
+ // AnalyticsCoreUtil.
+ AutoCloseable gcsFileSystem,
BlobId blobId,
GCPProperties gcpProperties,
MetricsContext metrics) {
diff --git
a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GcsInputStreamWrapper.java
b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GcsInputStreamWrapper.java
deleted file mode 100644
index 25ba7662dd..0000000000
--- a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GcsInputStreamWrapper.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iceberg.gcp.gcs;
-
-import com.google.api.client.util.Preconditions;
-import com.google.cloud.gcs.analyticscore.client.GcsObjectRange;
-import com.google.cloud.gcs.analyticscore.core.GoogleCloudStorageInputStream;
-import com.google.cloud.storage.BlobId;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.function.IntFunction;
-import java.util.stream.Collectors;
-import org.apache.iceberg.io.FileIOMetricsContext;
-import org.apache.iceberg.io.FileRange;
-import org.apache.iceberg.io.RangeReadable;
-import org.apache.iceberg.io.SeekableInputStream;
-import org.apache.iceberg.metrics.Counter;
-import org.apache.iceberg.metrics.MetricsContext;
-
-class GcsInputStreamWrapper extends SeekableInputStream implements
RangeReadable {
- private final Counter readBytes;
- private final Counter readOperations;
- private final GoogleCloudStorageInputStream stream;
- private final BlobId blobId;
-
- GcsInputStreamWrapper(
- GoogleCloudStorageInputStream stream, BlobId blobId, MetricsContext
metrics) {
- Preconditions.checkArgument(null != stream, "Invalid input stream : null");
- Preconditions.checkArgument(null != blobId, "Invalid blobId : null");
- this.stream = stream;
- this.blobId = blobId;
- this.readBytes = metrics.counter(FileIOMetricsContext.READ_BYTES,
MetricsContext.Unit.BYTES);
- this.readOperations =
metrics.counter(FileIOMetricsContext.READ_OPERATIONS);
- }
-
- @Override
- public long getPos() throws IOException {
- return stream.getPos();
- }
-
- @Override
- public void seek(long newPos) throws IOException {
- stream.seek(newPos);
- }
-
- @Override
- public int read() throws IOException {
- int readByte;
- try {
- readByte = stream.read();
- } catch (IOException e) {
- GCSExceptionUtil.throwNotFoundIfNotPresent(e, blobId);
- throw e;
- }
- readBytes.increment();
- readOperations.increment();
- return readByte;
- }
-
- @Override
- public int read(byte[] b) throws IOException {
- return read(b, 0, b.length);
- }
-
- @Override
- public int read(byte[] b, int off, int len) throws IOException {
- int bytesRead;
- try {
- bytesRead = stream.read(b, off, len);
- } catch (IOException e) {
- GCSExceptionUtil.throwNotFoundIfNotPresent(e, blobId);
- throw e;
- }
- if (bytesRead > 0) {
- readBytes.increment(bytesRead);
- }
- readOperations.increment();
- return bytesRead;
- }
-
- @Override
- public void readFully(long position, byte[] buffer, int offset, int length)
throws IOException {
- try {
- stream.readFully(position, buffer, offset, length);
- } catch (IOException e) {
- GCSExceptionUtil.throwNotFoundIfNotPresent(e, blobId);
- throw e;
- }
- }
-
- @Override
- public int readTail(byte[] buffer, int offset, int length) throws
IOException {
- try {
- return stream.readTail(buffer, offset, length);
- } catch (IOException e) {
- GCSExceptionUtil.throwNotFoundIfNotPresent(e, blobId);
- throw e;
- }
- }
-
- @Override
- public void readVectored(List<FileRange> ranges, IntFunction<ByteBuffer>
allocate)
- throws IOException {
- List<GcsObjectRange> objectRanges =
- ranges.stream()
- .map(
- fileRange ->
- GcsObjectRange.builder()
- .setOffset(fileRange.offset())
- .setLength(fileRange.length())
- .setByteBufferFuture(fileRange.byteBuffer())
- .build())
- .collect(Collectors.toList());
- try {
- stream.readVectored(objectRanges, allocate);
- } catch (IOException e) {
- GCSExceptionUtil.throwNotFoundIfNotPresent(e, blobId);
- throw e;
- }
- }
-
- @Override
- public void close() throws IOException {
- stream.close();
- }
-}
diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/PrefixedStorage.java
b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/PrefixedStorage.java
index 06017bb2e8..a442269c09 100644
--- a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/PrefixedStorage.java
+++ b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/PrefixedStorage.java
@@ -23,10 +23,6 @@ import com.google.auth.Credentials;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.auth.oauth2.ImpersonatedCredentials;
import com.google.cloud.NoCredentials;
-import com.google.cloud.gcs.analyticscore.client.GcsFileSystem;
-import com.google.cloud.gcs.analyticscore.client.GcsFileSystemImpl;
-import com.google.cloud.gcs.analyticscore.client.GcsFileSystemOptions;
-import com.google.cloud.gcs.analyticscore.core.GcsAnalyticsCoreOptions;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import java.io.IOException;
@@ -45,11 +41,11 @@ class PrefixedStorage implements AutoCloseable {
private static final String GCS_FILE_IO_USER_AGENT = "gcsfileio/" +
EnvironmentContext.get();
private final String storagePrefix;
private final GCPProperties gcpProperties;
+ private final Map<String, String> propertiesWithUserAgent;
private SerializableSupplier<Storage> storage;
private CloseableGroup closeableGroup;
private transient volatile Storage storageClient;
- private final SerializableSupplier<GcsFileSystem> gcsFileSystemSupplier;
- private transient volatile GcsFileSystem gcsFileSystem;
+ private transient volatile AutoCloseable gcsFileSystem;
PrefixedStorage(
String storagePrefix, Map<String, String> properties,
SerializableSupplier<Storage> storage) {
@@ -59,6 +55,11 @@ class PrefixedStorage implements AutoCloseable {
this.storagePrefix = storagePrefix;
this.storage = storage;
this.gcpProperties = new GCPProperties(properties);
+ this.propertiesWithUserAgent =
+ ImmutableMap.<String, String>builder()
+ .putAll(properties)
+ .put("gcs.user-agent", GCS_FILE_IO_USER_AGENT)
+ .build();
this.closeableGroup = new CloseableGroup();
if (null == storage) {
this.storage =
@@ -81,8 +82,6 @@ class PrefixedStorage implements AutoCloseable {
return builder.build().getService();
};
}
-
- this.gcsFileSystemSupplier = gcsFileSystemSupplier(properties);
}
public String storagePrefix() {
@@ -107,31 +106,45 @@ class PrefixedStorage implements AutoCloseable {
@Override
public void close() {
- if (null != closeableGroup) {
+ try {
try {
- closeableGroup.close();
+ if (null != closeableGroup) {
+ closeableGroup.close();
+ }
} catch (IOException ioe) {
throw new UncheckedIOException(ioe);
+ } finally {
+ if (null != gcsFileSystem) {
+ AnalyticsCoreUtil.close(gcsFileSystem);
+ gcsFileSystem = null;
+ }
+ }
+ } finally {
+ if (null != storage) {
+ // GCS Storage does not appear to be closable, so release the reference
+ storage = null;
}
}
+ }
- if (null != storage) {
- // GCS Storage does not appear to be closable, so release the reference
- storage = null;
+ // Returns AutoCloseable to avoid a runtime dependency on
gcs-analytics-core. Cast via
+ // AnalyticsCoreUtil.
+ AutoCloseable gcsFileSystem() {
+ if (!gcpProperties.isGcsAnalyticsCoreEnabled()) {
+ return null;
}
- }
- GcsFileSystem gcsFileSystem() {
if (gcsFileSystem == null) {
synchronized (this) {
if (gcsFileSystem == null) {
- this.gcsFileSystem = gcsFileSystemSupplier.get();
- this.closeableGroup.addCloseable(gcsFileSystem);
+ this.gcsFileSystem =
+ AnalyticsCoreUtil.createFileSystem(
+ propertiesWithUserAgent, credentials(gcpProperties));
}
}
}
- return this.gcsFileSystem;
+ return gcsFileSystem;
}
private Credentials credentials(GCPProperties properties) {
@@ -169,20 +182,4 @@ class PrefixedStorage implements AutoCloseable {
throw new UncheckedIOException("Failed to create impersonated
credentials for GCS", e);
}
}
-
- private SerializableSupplier<GcsFileSystem> gcsFileSystemSupplier(
- Map<String, String> properties) {
- ImmutableMap.Builder<String, String> propertiesWithUserAgent =
- new ImmutableMap.Builder<String, String>()
- .putAll(properties)
- .put("gcs.user-agent", GCS_FILE_IO_USER_AGENT);
- GcsAnalyticsCoreOptions gcsAnalyticsCoreOptions =
- new GcsAnalyticsCoreOptions("gcs.", propertiesWithUserAgent.build());
- GcsFileSystemOptions fileSystemOptions =
gcsAnalyticsCoreOptions.getGcsFileSystemOptions();
- Credentials credentials = credentials(new GCPProperties(properties));
- return () ->
- credentials == null
- ? new GcsFileSystemImpl(fileSystemOptions)
- : new GcsFileSystemImpl(credentials, fileSystemOptions);
- }
}
diff --git
a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/TestAnalyticsCoreUtil.java
b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/TestAnalyticsCoreUtil.java
new file mode 100644
index 0000000000..d915553453
--- /dev/null
+++ b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/TestAnalyticsCoreUtil.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.gcp.gcs;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.verify;
+
+import com.google.cloud.gcs.analyticscore.client.GcsFileSystem;
+import com.google.cloud.gcs.analyticscore.client.GcsItemId;
+import com.google.cloud.gcs.analyticscore.client.GcsObjectRange;
+import com.google.cloud.gcs.analyticscore.core.GoogleCloudStorageInputStream;
+import com.google.cloud.storage.BlobId;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.IntFunction;
+import org.apache.iceberg.io.FileRange;
+import org.apache.iceberg.io.RangeReadable;
+import org.apache.iceberg.io.SeekableInputStream;
+import org.apache.iceberg.metrics.MetricsContext;
+import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+
+public class TestAnalyticsCoreUtil {
+
+ @Test
+ public void readVectored() throws IOException {
+ GcsFileSystem fileSystem = mock(GcsFileSystem.class);
+ GoogleCloudStorageInputStream gcsInputStream =
mock(GoogleCloudStorageInputStream.class);
+ BlobId blobId = BlobId.of("mockbucket", "mockname");
+
+ SeekableInputStream stream;
+ try (MockedStatic<GoogleCloudStorageInputStream> mocked =
+ mockStatic(GoogleCloudStorageInputStream.class)) {
+ mocked
+ .when(() -> GoogleCloudStorageInputStream.create(eq(fileSystem),
any(GcsItemId.class)))
+ .thenReturn(gcsInputStream);
+ stream = AnalyticsCoreUtil.newStream(fileSystem, blobId, null,
MetricsContext.nullMetrics());
+ }
+
+ CompletableFuture<ByteBuffer> future1 = new CompletableFuture<>();
+ CompletableFuture<ByteBuffer> future2 = new CompletableFuture<>();
+ List<FileRange> ranges =
+ List.of(new FileRange(future1, 10L, 100), new FileRange(future2, 0,
50));
+ IntFunction<ByteBuffer> allocate = ByteBuffer::allocate;
+
+ ((RangeReadable) stream).readVectored(ranges, allocate);
+
+ List<GcsObjectRange> objectRanges =
+ List.of(
+ GcsObjectRange.builder()
+ .setOffset(10)
+ .setLength(100)
+ .setByteBufferFuture(future1)
+ .build(),
+ GcsObjectRange.builder()
+ .setOffset(0)
+ .setLength(50)
+ .setByteBufferFuture(future2)
+ .build());
+ verify(gcsInputStream).readVectored(objectRanges, allocate);
+ }
+}
diff --git a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/TestGcsInputFile.java
b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/TestGcsInputFile.java
index e58e814bd8..e3d295805f 100644
--- a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/TestGcsInputFile.java
+++ b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/TestGcsInputFile.java
@@ -158,8 +158,9 @@ public class TestGcsInputFile {
metricsContext);
try (SeekableInputStream stream = inputFile.newStream()) {
- assertThat(stream).isInstanceOf(GcsInputStreamWrapper.class);
+ assertThat(stream).isNotInstanceOf(GCSInputStream.class);
}
+ mocked.verify(() -> GoogleCloudStorageInputStream.create(gcsFileSystem,
gcsFileInfo));
}
}
@@ -189,8 +190,9 @@ public class TestGcsInputFile {
metricsContext);
try (SeekableInputStream stream = inputFile.newStream()) {
- assertThat(stream).isInstanceOf(GcsInputStreamWrapper.class);
+ assertThat(stream).isNotInstanceOf(GCSInputStream.class);
}
+ mocked.verify(() -> GoogleCloudStorageInputStream.create(gcsFileSystem,
itemId));
}
}
diff --git
a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/TestGcsInputStreamWrapper.java
b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/TestGcsInputStreamWrapper.java
deleted file mode 100644
index c6eae113d5..0000000000
---
a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/TestGcsInputStreamWrapper.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iceberg.gcp.gcs;
-
-import com.google.cloud.gcs.analyticscore.client.GcsObjectRange;
-import com.google.cloud.gcs.analyticscore.core.GoogleCloudStorageInputStream;
-import com.google.cloud.storage.BlobId;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.function.IntFunction;
-import org.apache.iceberg.io.FileRange;
-import org.apache.iceberg.metrics.MetricsContext;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.junit.jupiter.MockitoExtension;
-
-@ExtendWith(MockitoExtension.class)
-public class TestGcsInputStreamWrapper {
-
- @Mock private GoogleCloudStorageInputStream googleCloudStorageInputStream;
-
- private GcsInputStreamWrapper inputStreamWrapper;
-
- @BeforeEach
- public void before() {
- inputStreamWrapper =
- new GcsInputStreamWrapper(
- googleCloudStorageInputStream,
- BlobId.of("mockbucket", "mockname"),
- MetricsContext.nullMetrics());
- }
-
- @Test
- public void getPos() throws IOException {
- inputStreamWrapper.getPos();
-
- Mockito.verify(googleCloudStorageInputStream).getPos();
- }
-
- @Test
- public void seek() throws IOException {
- long newPos = 1234L;
- inputStreamWrapper.seek(newPos);
-
- Mockito.verify(googleCloudStorageInputStream).seek(newPos);
- }
-
- @Test
- public void read() throws IOException {
- inputStreamWrapper.read();
-
- Mockito.verify(googleCloudStorageInputStream).read();
- }
-
- @Test
- public void readByteArray() throws IOException {
- byte[] buffer = new byte[1024];
-
- inputStreamWrapper.read(buffer);
-
- Mockito.verify(googleCloudStorageInputStream).read(buffer, 0,
buffer.length);
- }
-
- @Test
- public void readByteArrayWithOffset() throws IOException {
- byte[] buffer = new byte[1024];
- int off = 10;
- int len = 100;
-
- inputStreamWrapper.read(buffer, off, len);
-
- Mockito.verify(googleCloudStorageInputStream).read(buffer, off, len);
- }
-
- @Test
- public void readFully() throws IOException {
- long position = 123L;
- byte[] buffer = new byte[1024];
- int offset = 10;
- int length = 100;
-
- inputStreamWrapper.readFully(position, buffer, offset, length);
-
- Mockito.verify(googleCloudStorageInputStream).readFully(position, buffer,
offset, length);
- }
-
- @Test
- public void readTail() throws IOException {
- byte[] buffer = new byte[1024];
- int offset = 10;
- int length = 100;
-
- inputStreamWrapper.readTail(buffer, offset, length);
-
- Mockito.verify(googleCloudStorageInputStream).readTail(buffer, offset,
length);
- }
-
- @Test
- public void readVectored() throws IOException {
- CompletableFuture<ByteBuffer> future1 = new CompletableFuture<>();
- CompletableFuture<ByteBuffer> future2 = new CompletableFuture<>();
- List<FileRange> ranges =
- List.of(new FileRange(future1, 10L, 100), new FileRange(future2, 0,
50));
- IntFunction<ByteBuffer> allocate = ByteBuffer::allocate;
-
- inputStreamWrapper.readVectored(ranges, allocate);
- List<GcsObjectRange> objectRanges =
- List.of(
- GcsObjectRange.builder()
- .setOffset(10)
- .setLength(100)
- .setByteBufferFuture(future1)
- .build(),
- GcsObjectRange.builder()
- .setOffset(0)
- .setLength(50)
- .setByteBufferFuture(future2)
- .build());
-
- Mockito.verify(googleCloudStorageInputStream).readVectored(objectRanges,
allocate);
- }
-
- @Test
- public void close() throws IOException {
- inputStreamWrapper.close();
-
- Mockito.verify(googleCloudStorageInputStream).close();
- }
-}
diff --git
a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/TestPrefixedStorage.java
b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/TestPrefixedStorage.java
index dd68b03766..0a06fcdd0c 100644
--- a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/TestPrefixedStorage.java
+++ b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/TestPrefixedStorage.java
@@ -115,17 +115,27 @@ public class TestPrefixedStorage {
.isEqualTo(GCPProperties.GCS_IMPERSONATE_LIFETIME_SECONDS_DEFAULT);
}
+ @Test
+ public void gcsFileSystemDisabledByDefault() {
+ Map<String, String> properties =
ImmutableMap.of(GCPProperties.GCS_PROJECT_ID, "myProject");
+ PrefixedStorage storage = new PrefixedStorage("gs://bucket", properties,
null);
+
+ assertThat(storage.gcsFileSystem()).isNull();
+ }
+
@Test
public void gcsFileSystem() {
Map<String, String> properties =
- ImmutableMap.of(
- GCPProperties.GCS_PROJECT_ID, "myProject",
- GCPProperties.GCS_USER_PROJECT, "userProject",
- GCPProperties.GCS_CLIENT_LIB_TOKEN, "gccl",
- GCPProperties.GCS_SERVICE_HOST, "example.com",
- GCPProperties.GCS_DECRYPTION_KEY, "decryptionKey",
- GCPProperties.GCS_ENCRYPTION_KEY, "encryptionKey",
- GCPProperties.GCS_CHANNEL_READ_CHUNK_SIZE, "1024");
+ ImmutableMap.<String, String>builder()
+ .put(GCPProperties.GCS_ANALYTICS_CORE_ENABLED, "true")
+ .put(GCPProperties.GCS_PROJECT_ID, "myProject")
+ .put(GCPProperties.GCS_USER_PROJECT, "userProject")
+ .put(GCPProperties.GCS_CLIENT_LIB_TOKEN, "gccl")
+ .put(GCPProperties.GCS_SERVICE_HOST, "example.com")
+ .put(GCPProperties.GCS_DECRYPTION_KEY, "decryptionKey")
+ .put(GCPProperties.GCS_ENCRYPTION_KEY, "encryptionKey")
+ .put(GCPProperties.GCS_CHANNEL_READ_CHUNK_SIZE, "1024")
+ .build();
PrefixedStorage storage = new PrefixedStorage("gs://bucket", properties,
null);
GcsFileSystemOptions expectedOptions =
GcsFileSystemOptions.builder()
@@ -144,7 +154,7 @@ public class TestPrefixedStorage {
.build())
.build();
- GcsFileSystem fileSystem = storage.gcsFileSystem();
+ GcsFileSystem fileSystem = (GcsFileSystem) storage.gcsFileSystem();
assertThat(fileSystem).isNotNull();
assertThat(fileSystem.getGcsClient()).isNotNull();