This is an automated email from the ASF dual-hosted git repository.

gabor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-java.git


The following commit(s) were added to refs/heads/master by this push:
     new f4a3e8b65 GH-3078: Use Hadoop FileSystem.openFile() to open files 
(#3079)
f4a3e8b65 is described below

commit f4a3e8b655d4bd8bd61b7982eaf4ec340fd4e333
Author: Steve Loughran <[email protected]>
AuthorDate: Wed Dec 4 07:18:42 2024 +0000

    GH-3078: Use Hadoop FileSystem.openFile() to open files (#3079)
---
 .../parquet/hadoop/util/HadoopInputFile.java       |  61 ++++-
 .../parquet/hadoop/util/wrapped/io/FutureIO.java   |  23 ++
 .../org/apache/hadoop/fs/FileSystemTestBinder.java |  77 ++++++
 .../parquet/hadoop/util/TestHadoopOpenFile.java    | 283 +++++++++++++++++++++
 4 files changed, 443 insertions(+), 1 deletion(-)

diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopInputFile.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopInputFile.java
index 48c79fa5e..fa6a7bdf4 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopInputFile.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopInputFile.java
@@ -19,8 +19,13 @@
 
 package org.apache.parquet.hadoop.util;
 
+import static org.apache.parquet.hadoop.util.wrapped.io.FutureIO.awaitFuture;
+
 import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.concurrent.CompletableFuture;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -29,6 +34,24 @@ import org.apache.parquet.io.SeekableInputStream;
 
 public class HadoopInputFile implements InputFile {
 
+  /**
+   * openFile() option name for setting the read policy: {@value}.
+   */
+  private static final String OPENFILE_READ_POLICY_KEY = 
"fs.option.openfile.read.policy";
+
+  /**
+   * Read policy when opening parquet files: {@value}.
+   * <p>Policy-aware stores pick the first policy they recognize in the list.
+   * everything recognizes "random";
+   * "vector" came in with 3.4.0, while "parquet" came with Hadoop 3.4.1
+   * parquet means "this is a Parquet file, so be clever about footers, 
prefetch,
+   * and expect vector and/or random IO".
+   * <p>In Hadoop 3.4.1, "parquet" and "vector" are both mapped to "random" 
for the
+   * S3A connector, but as the ABFS and GCS connectors do footer caching, they
+   * may use it as a hint to say "fetch the footer and keep it in memory"
+   */
+  private static final String PARQUET_READ_POLICY = "parquet, vector, random, 
adaptive";
+
   private final FileSystem fs;
   private final FileStatus stat;
   private final Configuration conf;
@@ -70,9 +93,45 @@ public class HadoopInputFile implements InputFile {
     return stat.getLen();
   }
 
+  /**
+   * Open the file.
+   * <p>Uses {@code FileSystem.openFile()} so that
+   * the existing FileStatus can be passed down: saves a HEAD request on cloud 
storage.
+   * and ignored everywhere else.
+   *
+   * @return the input stream.
+   *
+   * @throws InterruptedIOException future was interrupted
+   * @throws IOException if something went wrong
+   * @throws RuntimeException any nested RTE thrown
+   */
   @Override
   public SeekableInputStream newStream() throws IOException {
-    return HadoopStreams.wrap(fs.open(stat.getPath()));
+    FSDataInputStream stream;
+    try {
+      // this method is async so that implementations may do async HEAD head
+      // requests, such as S3A/ABFS when a file status is passed down.
+      final CompletableFuture<FSDataInputStream> future = 
fs.openFile(stat.getPath())
+          .withFileStatus(stat)
+          .opt(OPENFILE_READ_POLICY_KEY, PARQUET_READ_POLICY)
+          .build();
+      stream = awaitFuture(future);
+    } catch (RuntimeException e) {
+      // S3A < 3.3.5 would raise illegal path exception if the openFile path 
didn't
+      // equal the path in the FileStatus; Hive virtual FS could create this 
condition.
+      // As the path to open is derived from stat.getPath(), this condition 
seems
+      // near-impossible to create -but is handled here for due diligence.
+      try {
+        stream = fs.open(stat.getPath());
+      } catch (IOException | RuntimeException ex) {
+        // failure on this attempt attaches the failure of the openFile() call
+        // so the stack trace is preserved.
+        ex.addSuppressed(e);
+        throw ex;
+      }
+    }
+
+    return HadoopStreams.wrap(stream);
   }
 
   @Override
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/wrapped/io/FutureIO.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/wrapped/io/FutureIO.java
index 47f4e959b..23533c75b 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/wrapped/io/FutureIO.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/wrapped/io/FutureIO.java
@@ -70,6 +70,29 @@ public final class FutureIO {
     }
   }
 
+  /**
+   * Given a future, evaluate it.
+   * <p>
+   * Any exception generated in the future is
+   * extracted and rethrown.
+   * </p>
+   * @param future future to evaluate
+   * @param <T> type of the result.
+   * @return the result, if all went well.
+   * @throws InterruptedIOException future was interrupted
+   * @throws IOException if something went wrong
+   * @throws RuntimeException any nested RTE thrown
+   */
+  public static <T> T awaitFuture(final Future<T> future)
+      throws InterruptedIOException, IOException, RuntimeException {
+    try {
+      return future.get();
+    } catch (InterruptedException e) {
+      throw (InterruptedIOException) new 
InterruptedIOException(e.toString()).initCause(e);
+    } catch (ExecutionException e) {
+      throw unwrapInnerException(e);
+    }
+  }
   /**
    * From the inner cause of an execution exception, extract the inner cause
    * to an IOException, raising Errors immediately.
diff --git 
a/parquet-hadoop/src/test/java/org/apache/hadoop/fs/FileSystemTestBinder.java 
b/parquet-hadoop/src/test/java/org/apache/hadoop/fs/FileSystemTestBinder.java
new file mode 100644
index 000000000..2d087aa2d
--- /dev/null
+++ 
b/parquet-hadoop/src/test/java/org/apache/hadoop/fs/FileSystemTestBinder.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.net.URI;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * Based on {@code org.apache.hadoop.fs.FileSystemTestHelper},
+ * This class exports the package private {@code FileSystem}
+ * methods which can be used to push FS instances into the
+ * map of URI -> fs instance.
+ * <p>
+ * This makes it easy to add instances of Mocked filesystems
+ * to the map, which will then be picked up by any
+ * code retrieving an FS instance for that URI
+ * <p>
+ * The API is stable and used elsewhere. What is important
+ * is to remove FS instances after each test case.
+ * {@link #cleanFilesystemCache()} cleans the entire cache
+ * and should be used in teardown methods.
+ */
+public final class FileSystemTestBinder {
+
+  /**
+   * Empty configuration.
+   * Part of the FileSystem method signatures, but not used behind them.
+   */
+  public static final Configuration CONF = new Configuration(false);
+
+  /**
+   * Inject a filesystem into the cache.
+   * @param uri filesystem URI
+   * @param fs filesystem to inject
+   * @throws UncheckedIOException Hadoop UGI problems.
+   */
+  public static void addFileSystemForTesting(URI uri, FileSystem fs) {
+    try {
+      FileSystem.addFileSystemForTesting(uri, CONF, fs);
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+  }
+
+  /**
+   * Clean up the filesystem cache.
+   * This swallows any IOE nominally raised in the process, to ensure
+   * this can safely invoked in teardowns.
+   */
+  public static void cleanFilesystemCache() {
+    try {
+      FileSystem.closeAllForUGI(UserGroupInformation.getCurrentUser());
+    } catch (IOException ignored) {
+      // Ignore the exception as if getCurrentUser() fails then it'll
+      // have been impossible to add mock instances to a per-user cache.
+    }
+  }
+}
diff --git 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoopOpenFile.java
 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoopOpenFile.java
new file mode 100644
index 000000000..bf29541cd
--- /dev/null
+++ 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoopOpenFile.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.parquet.hadoop.util;
+
+import static 
org.apache.hadoop.fs.FileSystemTestBinder.addFileSystemForTesting;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.concurrent.CompletableFuture;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystemTestBinder;
+import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.impl.FutureDataInputStreamBuilderImpl;
+import org.apache.parquet.io.SeekableInputStream;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+/**
+ * Test suite to validate behavior of opening files through
+ * use of FileSystem.openFile(), especially fallback
+ * to the original FileSystem.open() method when
+ * openFile() raises an IllegalArgumentException or other RTE.
+ * <p>
+ * These tests use classes in the package org.apache.hadoop.fs.impl}.
+ * Although an implementation package, it is tagged as 
`LimitedPrivate("Filesystems")`;
+ * it is already used outside the hadoop codebase (e.g. google gcs).
+ */
+public class TestHadoopOpenFile {
+
+  private static final int FIRST = MockHadoopInputStream.TEST_ARRAY[0];
+  private URI fsUri;
+  private FileStatus status;
+  private Path path;
+  private Configuration conf;
+  private IOException fileNotFound;
+  private IllegalArgumentException illegal;
+
+  @Before
+  public void setUp() throws URISyntaxException {
+    // the schema "mock:" is used to not only be confident our injected
+    // instance is picked up, but to ensure that there will be no
+    // contamination of any real schema, such as file:
+    fsUri = new URI("mock://path/");
+    path = new Path("mock://path/file");
+
+    conf = new Configuration(false);
+    fileNotFound = new FileNotFoundException("file not found");
+    illegal = new IllegalArgumentException("illegal");
+    status = new FileStatus(10, false, 1, 1, 0, path);
+  }
+
+  /**
+   * Clean up the entire FS cache for the current user.
+   */
+  @After
+  public void tearDown() {
+    FileSystemTestBinder.cleanFilesystemCache();
+  }
+
+  /**
+   * The healthy path for opening a file.
+   */
+  @Test
+  public void testOpenFileGoodPath() throws Throwable {
+    final FileSystem mockFS = prepareMockFS();
+    final FSDataInputStream in = new FSDataInputStream(new 
MockHadoopInputStream());
+    final StubOpenFileBuilder opener = new StubOpenFileBuilder(mockFS, path, 
CompletableFuture.completedFuture(in));
+    doReturn(opener).when(mockFS).openFile(path);
+
+    // this looks up the FS binding via the status file path.
+    openAndRead(fileFromStatus());
+
+    // The fallback call of open(path) never took place.
+    Mockito.verify(mockFS, never()).open(path);
+  }
+
+  /**
+   * The openFile() call raises a RuntimeException which it is caught and the
+   * classic open() call invoked.
+   */
+  @Test
+  public void testOpenFileEarlyFailure() throws Throwable {
+    final FileSystem mockFS = prepareMockFS();
+    final FSDataInputStream in = new FSDataInputStream(new 
MockHadoopInputStream());
+
+    Mockito.doThrow(illegal).when(mockFS).openFile(path);
+    doReturn(in).when(mockFS).open(path);
+
+    // this looks up the FS binding via the status file path.
+    openAndRead(fileFromStatus());
+  }
+
+  /**
+   * openFile() failure during the completable future execution with an
+   * RTE raised.
+   * Again, this triggers a fallback to open().
+   */
+  @Test
+  public void testOpenFileLateFailure() throws Throwable {
+    final FileSystem mockFS = prepareMockFS();
+    final FSDataInputStream in = new FSDataInputStream(new 
MockHadoopInputStream());
+
+    final StubOpenFileBuilder opener = new StubOpenFileBuilder(
+        mockFS, path, CompletableFuture.completedFuture(null).thenApply((f) -> 
{
+          throw illegal;
+        }));
+    doReturn(opener).when(mockFS).openFile(path);
+    doReturn(in).when(mockFS).open(path);
+
+    openAndRead(fileFromStatus());
+  }
+
+  /**
+   * Open a stream, read the first byte, and assert that it matches
+   * what is expected.
+   *
+   * @param inputFile input file
+   *
+   * @throws IOException failure to open
+   */
+  private static void openAndRead(final HadoopInputFile inputFile) throws 
IOException {
+    try (SeekableInputStream stream = inputFile.newStream()) {
+      Assert.assertEquals("byte read", FIRST, stream.read());
+    }
+  }
+
+  /**
+   * If openFile() raises an IOException within the future,
+   * then it is thrown and the classic open() call never invoked.
+   */
+  @Test
+  public void testOpenFileRaisesIOException() throws Throwable {
+    final FileSystem mockFS = prepareMockFS();
+
+    final StubOpenFileBuilder opener = new StubOpenFileBuilder(
+        mockFS, path, CompletableFuture.completedFuture(null).thenApply((f) -> 
{
+          // throw a wrapped IOE
+          throw new UncheckedIOException(fileNotFound);
+        }));
+    doReturn(opener).when(mockFS).openFile(path);
+
+    final HadoopInputFile inputFile = fileFromStatus();
+    Assert.assertThrows(FileNotFoundException.class, inputFile::newStream);
+    Mockito.verify(mockFS, never()).open(path);
+  }
+
+  /**
+   * If openFile() raises a RuntimeException, this it is caught and the.
+   * classic open() call invoked.
+   * If that call raises an IOE.
+   * Outcome: the IOE is thrown but the caught RTE is added to the
+   * suppressed list.
+   */
+  @Test
+  public void testOpenFileDoubleFailure() throws Throwable {
+    final FileSystem mockFS = prepareMockFS();
+
+    Mockito.doThrow(illegal).when(mockFS).openFile(path);
+    Mockito.doThrow(fileNotFound).when(mockFS).open(path);
+
+    // this looks up the FS binding via the status file path.
+    final HadoopInputFile inputFile = fileFromStatus();
+
+    final FileNotFoundException caught = 
Assert.assertThrows(FileNotFoundException.class, inputFile::newStream);
+    Assert.assertSame(fileNotFound, caught);
+    final Throwable[] suppressed = caught.getSuppressed();
+    Assert.assertEquals("number of suppressed exceptions", 1, 
suppressed.length);
+    Assert.assertSame(illegal, suppressed[0]);
+  }
+
+  /**
+   * The handling of a double RTE is the same as the case of
+   * the sequence of: RTE, IOE.
+   */
+  @Test
+  public void testOpenFileDoubleRTE() throws Throwable {
+    final FileSystem mockFS = prepareMockFS();
+
+    Mockito.doThrow(illegal).when(mockFS).openFile(path);
+    NullPointerException npe = new NullPointerException("null");
+    Mockito.doThrow(npe).when(mockFS).open(path);
+
+    // this looks up the FS binding via the status file path.
+    final HadoopInputFile inputFile = fileFromStatus();
+
+    final NullPointerException caught = 
Assert.assertThrows(NullPointerException.class, inputFile::newStream);
+    Assert.assertSame(npe, caught);
+    final Throwable[] suppressed = caught.getSuppressed();
+    Assert.assertEquals("number of suppressed exceptions", 1, 
suppressed.length);
+    Assert.assertSame(illegal, suppressed[0]);
+  }
+
+  /**
+   * Create a mock FileSystem with the foundational operations
+   * mocked. The FS is added as the binding for the mock URI.
+   *
+   * @return a mock FileSystem
+   *
+   * @throws IOException stub signature only.
+   */
+  private FileSystem prepareMockFS() throws IOException {
+    final FileSystem mockFS = mock(FileSystem.class);
+    doNothing().when(mockFS).close();
+    doReturn(conf).when(mockFS).getConf();
+    doReturn(status).when(mockFS).getFileStatus(path);
+
+    // register the FS instance under the mock URI
+    addFileSystemForTesting(fsUri, mockFS);
+    return mockFS;
+  }
+
+  /**
+   * Build an input file from the status field.
+   * @return an input file.
+   * @throws IOException failure to create the associated filesystem.
+   */
+  private HadoopInputFile fileFromStatus() throws IOException {
+    return HadoopInputFile.fromStatus(status, conf);
+  }
+
+  /**
+   * Stub implementation of {@link FutureDataInputStreamBuilder}.
+   * Trying to mock the interface is troublesome as the interface has added
+   * some new methods over time, instead this uses the base implementation 
class
+   * within o.a.h.fs.impl.
+   */
+  private static final class StubOpenFileBuilder extends 
FutureDataInputStreamBuilderImpl {
+
+    /**
+     * Operation to invoke to build the result.
+     */
+    private final CompletableFuture<FSDataInputStream> result;
+
+    /**
+     * Create the builder. The FS and path must be non-null.
+     *
+     * @param fileSystem fs
+     * @param path path to open
+     * @param result builder result.
+     */
+    private StubOpenFileBuilder(
+        final FileSystem fileSystem, Path path, final 
CompletableFuture<FSDataInputStream> result) {
+      super(fileSystem, path);
+      this.result = result;
+    }
+
+    @Override
+    public CompletableFuture<FSDataInputStream> build()
+        throws IllegalArgumentException, UnsupportedOperationException {
+      return result;
+    }
+  }
+}

Reply via email to