This is an automated email from the ASF dual-hosted git repository.

russellspitzer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new a8ef09eb0b Core: Extend ResolvingFileIO to support prefix-based 
operations (#8334)
a8ef09eb0b is described below

commit a8ef09eb0b73b478ef8c9a710a6d6b3a673c1d4a
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Wed Sep 13 17:06:06 2023 +0200

    Core: Extend ResolvingFileIO to support prefix-based operations (#8334)
---
 .../java/org/apache/iceberg/io/DelegateFileIO.java | 25 +++++++++
 .../java/org/apache/iceberg/aws/s3/S3FileIO.java   |  7 +--
 .../org/apache/iceberg/aws/s3/TestS3FileIO.java    | 16 ++++++
 .../org/apache/iceberg/hadoop/HadoopFileIO.java    |  7 +--
 .../org/apache/iceberg/io/ResolvingFileIO.java     | 65 ++++++++++++----------
 .../apache/iceberg/hadoop/HadoopFileIOTest.java    | 15 +++++
 .../org/apache/iceberg/io/TestResolvingIO.java     | 65 ++++++++++++++++------
 .../java/org/apache/iceberg/gcp/gcs/GCSFileIO.java |  6 +-
 .../org/apache/iceberg/gcp/gcs/GCSFileIOTest.java  | 16 ++++++
 9 files changed, 162 insertions(+), 60 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/io/DelegateFileIO.java 
b/api/src/main/java/org/apache/iceberg/io/DelegateFileIO.java
new file mode 100644
index 0000000000..4b97b3cb28
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/io/DelegateFileIO.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.io;
+
+/**
+ * This interface is intended as an extension for FileIO implementations that 
support being a
+ * delegate target.
+ */
+public interface DelegateFileIO extends FileIO, SupportsPrefixOperations, 
SupportsBulkOperations {}
diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java 
b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
index fa75906088..dd13e13f01 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
@@ -33,12 +33,10 @@ import org.apache.iceberg.aws.S3FileIOAwsClientFactories;
 import org.apache.iceberg.common.DynConstructors;
 import org.apache.iceberg.io.BulkDeletionFailureException;
 import org.apache.iceberg.io.CredentialSupplier;
-import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.DelegateFileIO;
 import org.apache.iceberg.io.FileInfo;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.io.OutputFile;
-import org.apache.iceberg.io.SupportsBulkOperations;
-import org.apache.iceberg.io.SupportsPrefixOperations;
 import org.apache.iceberg.metrics.MetricsContext;
 import org.apache.iceberg.relocated.com.google.common.base.Joiner;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -75,8 +73,7 @@ import software.amazon.awssdk.services.s3.model.Tagging;
  * schemes s3a, s3n, https are also treated as s3 file paths. Using this 
FileIO with other schemes
  * will result in {@link org.apache.iceberg.exceptions.ValidationException}.
  */
-public class S3FileIO
-    implements FileIO, SupportsBulkOperations, SupportsPrefixOperations, 
CredentialSupplier {
+public class S3FileIO implements CredentialSupplier, DelegateFileIO {
   private static final Logger LOG = LoggerFactory.getLogger(S3FileIO.class);
   private static final String DEFAULT_METRICS_IMPL =
       "org.apache.iceberg.hadoop.HadoopMetricsContext";
diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIO.java 
b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIO.java
index 761cd51431..a74e574c97 100644
--- a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIO.java
+++ b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIO.java
@@ -35,12 +35,14 @@ import java.util.Map;
 import java.util.Random;
 import java.util.UUID;
 import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.BaseTable;
 import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.TestHelpers;
 import org.apache.iceberg.aws.AwsProperties;
 import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.common.DynMethods;
 import org.apache.iceberg.exceptions.NotFoundException;
 import org.apache.iceberg.io.BulkDeletionFailureException;
 import org.apache.iceberg.io.FileIO;
@@ -48,6 +50,7 @@ import org.apache.iceberg.io.FileIOParser;
 import org.apache.iceberg.io.IOUtil;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.ResolvingFileIO;
 import org.apache.iceberg.jdbc.JdbcCatalog;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -361,6 +364,19 @@ public class TestS3FileIO {
         .isEqualTo(testS3FileIO.properties());
   }
 
+  @Test
+  public void testResolvingFileIOLoad() {
+    ResolvingFileIO resolvingFileIO = new ResolvingFileIO();
+    resolvingFileIO.setConf(new Configuration());
+    resolvingFileIO.initialize(ImmutableMap.of());
+    FileIO result =
+        DynMethods.builder("io")
+            .hiddenImpl(ResolvingFileIO.class, String.class)
+            .build(resolvingFileIO)
+            .invoke("s3://foo/bar");
+    Assertions.assertThat(result).isInstanceOf(S3FileIO.class);
+  }
+
   private void createRandomObjects(String prefix, int count) {
     S3URI s3URI = new S3URI(prefix);
 
diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java 
b/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java
index 04ead0bd67..7aaa2b6a75 100644
--- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java
+++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java
@@ -32,12 +32,10 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.iceberg.exceptions.RuntimeIOException;
 import org.apache.iceberg.io.BulkDeletionFailureException;
-import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.DelegateFileIO;
 import org.apache.iceberg.io.FileInfo;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.io.OutputFile;
-import org.apache.iceberg.io.SupportsBulkOperations;
-import org.apache.iceberg.io.SupportsPrefixOperations;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Streams;
 import org.apache.iceberg.util.SerializableMap;
@@ -47,8 +45,7 @@ import org.apache.iceberg.util.ThreadPools;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class HadoopFileIO
-    implements FileIO, HadoopConfigurable, SupportsPrefixOperations, 
SupportsBulkOperations {
+public class HadoopFileIO implements HadoopConfigurable, DelegateFileIO {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(HadoopFileIO.class);
   private static final String DELETE_FILE_PARALLELISM = 
"iceberg.hadoop.delete-file-parallelism";
diff --git a/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java 
b/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java
index 7f4718b601..e5c7c51631 100644
--- a/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java
+++ b/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java
@@ -31,18 +31,22 @@ import org.apache.iceberg.hadoop.HadoopConfigurable;
 import org.apache.iceberg.hadoop.SerializableConfiguration;
 import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
 import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterators;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.util.SerializableMap;
 import org.apache.iceberg.util.SerializableSupplier;
-import org.apache.iceberg.util.Tasks;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/** FileIO implementation that uses location scheme to choose the correct 
FileIO implementation. */
-public class ResolvingFileIO implements FileIO, HadoopConfigurable, 
SupportsBulkOperations {
+/**
+ * FileIO implementation that uses location scheme to choose the correct 
FileIO implementation.
+ * Delegate FileIO implementations must implement the {@link DelegateFileIO} 
mixin interface,
+ * otherwise initialization will fail.
+ */
+public class ResolvingFileIO implements HadoopConfigurable, DelegateFileIO {
   private static final Logger LOG = 
LoggerFactory.getLogger(ResolvingFileIO.class);
   private static final int BATCH_SIZE = 100_000;
   private static final String FALLBACK_IMPL = 
"org.apache.iceberg.hadoop.HadoopFileIO";
@@ -58,7 +62,7 @@ public class ResolvingFileIO implements FileIO, 
HadoopConfigurable, SupportsBulk
           "abfs", ADLS_FILE_IO_IMPL,
           "abfss", ADLS_FILE_IO_IMPL);
 
-  private final Map<String, FileIO> ioInstances = Maps.newConcurrentMap();
+  private final Map<String, DelegateFileIO> ioInstances = 
Maps.newConcurrentMap();
   private final AtomicBoolean isClosed = new AtomicBoolean(false);
   private final transient StackTraceElement[] createStack;
   private SerializableMap<String, String> properties;
@@ -98,25 +102,12 @@ public class ResolvingFileIO implements FileIO, 
HadoopConfigurable, SupportsBulk
     Iterators.partition(pathsToDelete.iterator(), BATCH_SIZE)
         .forEachRemaining(
             partitioned -> {
-              Map<FileIO, List<String>> pathByFileIO =
+              Map<DelegateFileIO, List<String>> pathByFileIO =
                   
partitioned.stream().collect(Collectors.groupingBy(this::io));
-              for (Map.Entry<FileIO, List<String>> entries : 
pathByFileIO.entrySet()) {
-                FileIO io = entries.getKey();
+              for (Map.Entry<DelegateFileIO, List<String>> entries : 
pathByFileIO.entrySet()) {
+                DelegateFileIO io = entries.getKey();
                 List<String> filePaths = entries.getValue();
-                if (io instanceof SupportsBulkOperations) {
-                  ((SupportsBulkOperations) io).deleteFiles(filePaths);
-                } else {
-                  LOG.warn(
-                      "IO {} does not support bulk operations. Using non-bulk 
deletes.",
-                      io.getClass().getName());
-                  Tasks.Builder<String> deleteTasks =
-                      Tasks.foreach(filePaths)
-                          .noRetry()
-                          .suppressFailureWhenFinished()
-                          .onFailure(
-                              (file, exc) -> LOG.warn("Failed to delete file: 
{}", file, exc));
-                  deleteTasks.run(io::deleteFile);
-                }
+                io.deleteFiles(filePaths);
               }
             });
   }
@@ -136,12 +127,12 @@ public class ResolvingFileIO implements FileIO, 
HadoopConfigurable, SupportsBulk
   @Override
   public void close() {
     if (isClosed.compareAndSet(false, true)) {
-      List<FileIO> instances = Lists.newArrayList();
+      List<DelegateFileIO> instances = Lists.newArrayList();
 
       instances.addAll(ioInstances.values());
       ioInstances.clear();
 
-      for (FileIO io : instances) {
+      for (DelegateFileIO io : instances) {
         io.close();
       }
     }
@@ -164,9 +155,9 @@ public class ResolvingFileIO implements FileIO, 
HadoopConfigurable, SupportsBulk
   }
 
   @VisibleForTesting
-  FileIO io(String location) {
+  DelegateFileIO io(String location) {
     String impl = implFromLocation(location);
-    FileIO io = ioInstances.get(impl);
+    DelegateFileIO io = ioInstances.get(impl);
     if (io != null) {
       if (io instanceof HadoopConfigurable && ((HadoopConfigurable) 
io).getConf() == null) {
         synchronized (io) {
@@ -184,13 +175,14 @@ public class ResolvingFileIO implements FileIO, 
HadoopConfigurable, SupportsBulk
         impl,
         key -> {
           Configuration conf = hadoopConf.get();
+          FileIO fileIO;
 
           try {
             Map<String, String> props = Maps.newHashMap(properties);
             // ResolvingFileIO is keeping track of the creation stacktrace, so 
no need to do the
             // same in S3FileIO.
             props.put("init-creation-stacktrace", "false");
-            return CatalogUtil.loadFileIO(key, props, conf);
+            fileIO = CatalogUtil.loadFileIO(key, props, conf);
           } catch (IllegalArgumentException e) {
             if (key.equals(FALLBACK_IMPL)) {
               // no implementation to fall back to, throw the exception
@@ -203,7 +195,7 @@ public class ResolvingFileIO implements FileIO, 
HadoopConfigurable, SupportsBulk
                   FALLBACK_IMPL,
                   e);
               try {
-                return CatalogUtil.loadFileIO(FALLBACK_IMPL, properties, conf);
+                fileIO = CatalogUtil.loadFileIO(FALLBACK_IMPL, properties, 
conf);
               } catch (IllegalArgumentException suppressed) {
                 LOG.warn(
                     "Failed to load FileIO implementation: {} (fallback)",
@@ -216,10 +208,17 @@ public class ResolvingFileIO implements FileIO, 
HadoopConfigurable, SupportsBulk
               }
             }
           }
+
+          Preconditions.checkState(
+              fileIO instanceof DelegateFileIO,
+              "FileIO does not implement DelegateFileIO: " + 
fileIO.getClass().getName());
+
+          return (DelegateFileIO) fileIO;
         });
   }
 
-  private static String implFromLocation(String location) {
+  @VisibleForTesting
+  String implFromLocation(String location) {
     return SCHEME_TO_FILE_IO.getOrDefault(scheme(location), FALLBACK_IMPL);
   }
 
@@ -255,4 +254,14 @@ public class ResolvingFileIO implements FileIO, 
HadoopConfigurable, SupportsBulk
       }
     }
   }
+
+  @Override
+  public Iterable<FileInfo> listPrefix(String prefix) {
+    return io(prefix).listPrefix(prefix);
+  }
+
+  @Override
+  public void deletePrefix(String prefix) {
+    io(prefix).deletePrefix(prefix);
+  }
 }
diff --git a/core/src/test/java/org/apache/iceberg/hadoop/HadoopFileIOTest.java 
b/core/src/test/java/org/apache/iceberg/hadoop/HadoopFileIOTest.java
index 636c94069d..0a195819e1 100644
--- a/core/src/test/java/org/apache/iceberg/hadoop/HadoopFileIOTest.java
+++ b/core/src/test/java/org/apache/iceberg/hadoop/HadoopFileIOTest.java
@@ -30,8 +30,10 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.common.DynMethods;
 import org.apache.iceberg.io.BulkDeletionFailureException;
 import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.ResolvingFileIO;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Streams;
@@ -168,6 +170,19 @@ public class HadoopFileIOTest {
         .isEqualTo(testHadoopFileIO.properties());
   }
 
+  @Test
+  public void testResolvingFileIOLoad() {
+    ResolvingFileIO resolvingFileIO = new ResolvingFileIO();
+    resolvingFileIO.setConf(new Configuration());
+    resolvingFileIO.initialize(ImmutableMap.of());
+    FileIO result =
+        DynMethods.builder("io")
+            .hiddenImpl(ResolvingFileIO.class, String.class)
+            .build(resolvingFileIO)
+            .invoke("hdfs://foo/bar");
+    Assertions.assertThat(result).isInstanceOf(HadoopFileIO.class);
+  }
+
   private List<Path> createRandomFiles(Path parent, int count) {
     Vector<Path> paths = new Vector<>();
     random
diff --git a/core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java 
b/core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java
index ff1499bff7..f072053eea 100644
--- a/core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java
+++ b/core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java
@@ -19,9 +19,13 @@
 package org.apache.iceberg.io;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.withSettings;
 
 import java.io.IOException;
 import java.util.List;
@@ -33,7 +37,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.iceberg.TestHelpers;
 import org.apache.iceberg.hadoop.HadoopFileIO;
-import org.apache.iceberg.inmemory.InMemoryFileIO;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
@@ -130,27 +133,53 @@ public class TestResolvingIO {
   }
 
   @Test
-  public void resolveFileIONonBulkDeletion() {
+  public void delegateFileIOWithPrefixBasedSupport() throws IOException {
     ResolvingFileIO resolvingFileIO = spy(new ResolvingFileIO());
-    String parentPath = "inmemory://foo.db/bar";
-    // configure delegation IO
-    InMemoryFileIO delegation = new InMemoryFileIO();
-    doReturn(delegation).when(resolvingFileIO).io(anyString());
-    // write
-    byte[] someData = "some data".getBytes();
-    List<String> randomFilePaths =
+    Configuration hadoopConf = new Configuration();
+    FileSystem fs = FileSystem.getLocal(hadoopConf);
+    Path parent = new Path(temp.toUri());
+    HadoopFileIO delegate = new HadoopFileIO(hadoopConf);
+    doReturn(delegate).when(resolvingFileIO).io(anyString());
+
+    List<Path> paths =
         IntStream.range(1, 10)
-            .mapToObj(i -> parentPath + "-" + i + "-" + UUID.randomUUID())
+            .mapToObj(i -> new Path(parent, "random-" + i + "-" + 
UUID.randomUUID()))
             .collect(Collectors.toList());
-    for (String randomFilePath : randomFilePaths) {
-      delegation.addFile(randomFilePath, someData);
-      assertThat(delegation.fileExists(randomFilePath)).isTrue();
+    for (Path path : paths) {
+      fs.createNewFile(path);
+      assertThat(delegate.newInputFile(path.toString()).exists()).isTrue();
     }
-    // non-bulk deletion
-    resolvingFileIO.deleteFiles(randomFilePaths);
 
-    for (String path : randomFilePaths) {
-      assertThat(delegation.fileExists(path)).isFalse();
-    }
+    paths.stream()
+        .map(Path::toString)
+        .forEach(
+            path -> {
+              // HadoopFileIO can only list prefixes that match the full path
+              assertThat(resolvingFileIO.listPrefix(path)).hasSize(1);
+              resolvingFileIO.deletePrefix(path);
+              assertThat(delegate.newInputFile(path).exists()).isFalse();
+            });
+  }
+
+  @Test
+  public void delegateFileIOWithAndWithoutMixins() {
+    ResolvingFileIO resolvingFileIO = spy(new ResolvingFileIO());
+    resolvingFileIO.setConf(new Configuration());
+    resolvingFileIO.initialize(ImmutableMap.of());
+
+    String fileIONoMixins = mock(FileIO.class).getClass().getName();
+    doReturn(fileIONoMixins).when(resolvingFileIO).implFromLocation(any());
+    assertThatThrownBy(() -> resolvingFileIO.newInputFile("/file"))
+        .isInstanceOf(IllegalStateException.class)
+        .hasMessageStartingWith(
+            "FileIO does not implement DelegateFileIO: 
org.apache.iceberg.io.FileIO");
+
+    String fileIOWithMixins =
+        mock(FileIO.class, 
withSettings().extraInterfaces(DelegateFileIO.class))
+            .getClass()
+            .getName();
+    doReturn(fileIOWithMixins).when(resolvingFileIO).implFromLocation(any());
+    // being null is ok here as long as the code doesn't throw an exception
+    assertThat(resolvingFileIO.newInputFile("/file")).isNull();
   }
 }
diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java 
b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java
index 1e164f059d..09eb4a7400 100644
--- a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java
+++ b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java
@@ -30,12 +30,10 @@ import java.util.stream.Stream;
 import org.apache.iceberg.common.DynConstructors;
 import org.apache.iceberg.gcp.GCPProperties;
 import org.apache.iceberg.io.BulkDeletionFailureException;
-import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.DelegateFileIO;
 import org.apache.iceberg.io.FileInfo;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.io.OutputFile;
-import org.apache.iceberg.io.SupportsBulkOperations;
-import org.apache.iceberg.io.SupportsPrefixOperations;
 import org.apache.iceberg.metrics.MetricsContext;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterators;
 import org.apache.iceberg.relocated.com.google.common.collect.Streams;
@@ -56,7 +54,7 @@ import org.slf4j.LoggerFactory;
  * <p>See <a 
href="https://cloud.google.com/storage/docs/folders#overview";>Cloud Storage
  * Overview</a>
  */
-public class GCSFileIO implements FileIO, SupportsBulkOperations, 
SupportsPrefixOperations {
+public class GCSFileIO implements DelegateFileIO {
   private static final Logger LOG = LoggerFactory.getLogger(GCSFileIO.class);
   private static final String DEFAULT_METRICS_IMPL =
       "org.apache.iceberg.hadoop.HadoopMetricsContext";
diff --git a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSFileIOTest.java 
b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSFileIOTest.java
index 013c4d4955..50aa735605 100644
--- a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSFileIOTest.java
+++ b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSFileIOTest.java
@@ -35,12 +35,15 @@ import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Random;
 import java.util.stream.StreamSupport;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.common.DynMethods;
 import org.apache.iceberg.gcp.GCPProperties;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.IOUtil;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.ResolvingFileIO;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -206,4 +209,17 @@ public class GCSFileIOTest {
 
     
assertThat(testGCSFileIO.properties()).isEqualTo(roundTripSerializedFileIO.properties());
   }
+
+  @Test
+  public void testResolvingFileIOLoad() {
+    ResolvingFileIO resolvingFileIO = new ResolvingFileIO();
+    resolvingFileIO.setConf(new Configuration());
+    resolvingFileIO.initialize(ImmutableMap.of());
+    FileIO result =
+        DynMethods.builder("io")
+            .hiddenImpl(ResolvingFileIO.class, String.class)
+            .build(resolvingFileIO)
+            .invoke("gs://foo/bar");
+    assertThat(result).isInstanceOf(GCSFileIO.class);
+  }
 }

Reply via email to