This is an automated email from the ASF dual-hosted git repository.
gabor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-java.git
The following commit(s) were added to refs/heads/master by this push:
new f4a3e8b65 GH-3078: Use Hadoop FileSystem.openFile() to open files
(#3079)
f4a3e8b65 is described below
commit f4a3e8b655d4bd8bd61b7982eaf4ec340fd4e333
Author: Steve Loughran <[email protected]>
AuthorDate: Wed Dec 4 07:18:42 2024 +0000
GH-3078: Use Hadoop FileSystem.openFile() to open files (#3079)
---
.../parquet/hadoop/util/HadoopInputFile.java | 61 ++++-
.../parquet/hadoop/util/wrapped/io/FutureIO.java | 23 ++
.../org/apache/hadoop/fs/FileSystemTestBinder.java | 77 ++++++
.../parquet/hadoop/util/TestHadoopOpenFile.java | 283 +++++++++++++++++++++
4 files changed, 443 insertions(+), 1 deletion(-)
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopInputFile.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopInputFile.java
index 48c79fa5e..fa6a7bdf4 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopInputFile.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopInputFile.java
@@ -19,8 +19,13 @@
package org.apache.parquet.hadoop.util;
+import static org.apache.parquet.hadoop.util.wrapped.io.FutureIO.awaitFuture;
+
import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -29,6 +34,24 @@ import org.apache.parquet.io.SeekableInputStream;
public class HadoopInputFile implements InputFile {
+ /**
+ * openFile() option name for setting the read policy: {@value}.
+ */
+ private static final String OPENFILE_READ_POLICY_KEY =
"fs.option.openfile.read.policy";
+
+ /**
+ * Read policy when opening parquet files: {@value}.
+ * <p>Policy-aware stores pick the first policy they recognize in the list.
+ * everything recognizes "random";
+ * "vector" came in with 3.4.0, while "parquet" came with Hadoop 3.4.1
+ * parquet means "this is a Parquet file, so be clever about footers,
prefetch,
+ * and expect vector and/or random IO".
+ * <p>In Hadoop 3.4.1, "parquet" and "vector" are both mapped to "random"
for the
+ * S3A connector, but as the ABFS and GCS connectors do footer caching, they
+ * may use it as a hint to say "fetch the footer and keep it in memory"
+ */
+ private static final String PARQUET_READ_POLICY = "parquet, vector, random,
adaptive";
+
private final FileSystem fs;
private final FileStatus stat;
private final Configuration conf;
@@ -70,9 +93,45 @@ public class HadoopInputFile implements InputFile {
return stat.getLen();
}
+ /**
+ * Open the file.
+ * <p>Uses {@code FileSystem.openFile()} so that
+ * the existing FileStatus can be passed down: saves a HEAD request on cloud
storage.
+ * and ignored everywhere else.
+ *
+ * @return the input stream.
+ *
+ * @throws InterruptedIOException future was interrupted
+ * @throws IOException if something went wrong
+ * @throws RuntimeException any nested RTE thrown
+ */
@Override
public SeekableInputStream newStream() throws IOException {
- return HadoopStreams.wrap(fs.open(stat.getPath()));
+ FSDataInputStream stream;
+ try {
+ // this method is async so that implementations may do async HEAD head
+ // requests, such as S3A/ABFS when a file status is passed down.
+ final CompletableFuture<FSDataInputStream> future =
fs.openFile(stat.getPath())
+ .withFileStatus(stat)
+ .opt(OPENFILE_READ_POLICY_KEY, PARQUET_READ_POLICY)
+ .build();
+ stream = awaitFuture(future);
+ } catch (RuntimeException e) {
+ // S3A < 3.3.5 would raise illegal path exception if the openFile path
didn't
+ // equal the path in the FileStatus; Hive virtual FS could create this
condition.
+ // As the path to open is derived from stat.getPath(), this condition
seems
+ // near-impossible to create -but is handled here for due diligence.
+ try {
+ stream = fs.open(stat.getPath());
+ } catch (IOException | RuntimeException ex) {
+ // failure on this attempt attaches the failure of the openFile() call
+ // so the stack trace is preserved.
+ ex.addSuppressed(e);
+ throw ex;
+ }
+ }
+
+ return HadoopStreams.wrap(stream);
}
@Override
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/wrapped/io/FutureIO.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/wrapped/io/FutureIO.java
index 47f4e959b..23533c75b 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/wrapped/io/FutureIO.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/wrapped/io/FutureIO.java
@@ -70,6 +70,29 @@ public final class FutureIO {
}
}
+ /**
+ * Given a future, evaluate it.
+ * <p>
+ * Any exception generated in the future is
+ * extracted and rethrown.
+ * </p>
+ * @param future future to evaluate
+ * @param <T> type of the result.
+ * @return the result, if all went well.
+ * @throws InterruptedIOException future was interrupted
+ * @throws IOException if something went wrong
+ * @throws RuntimeException any nested RTE thrown
+ */
+ public static <T> T awaitFuture(final Future<T> future)
+ throws InterruptedIOException, IOException, RuntimeException {
+ try {
+ return future.get();
+ } catch (InterruptedException e) {
+ throw (InterruptedIOException) new
InterruptedIOException(e.toString()).initCause(e);
+ } catch (ExecutionException e) {
+ throw unwrapInnerException(e);
+ }
+ }
/**
* From the inner cause of an execution exception, extract the inner cause
* to an IOException, raising Errors immediately.
diff --git
a/parquet-hadoop/src/test/java/org/apache/hadoop/fs/FileSystemTestBinder.java
b/parquet-hadoop/src/test/java/org/apache/hadoop/fs/FileSystemTestBinder.java
new file mode 100644
index 000000000..2d087aa2d
--- /dev/null
+++
b/parquet-hadoop/src/test/java/org/apache/hadoop/fs/FileSystemTestBinder.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.net.URI;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * Based on {@code org.apache.hadoop.fs.FileSystemTestHelper},
+ * This class exports the package private {@code FileSystem}
+ * methods which can be used to push FS instances into the
+ * map of URI -> fs instance.
+ * <p>
+ * This makes it easy to add instances of Mocked filesystems
+ * to the map, which will then be picked up by any
+ * code retrieving an FS instance for that URI
+ * <p>
+ * The API is stable and used elsewhere. What is important
+ * is to remove FS instances after each test case.
+ * {@link #cleanFilesystemCache()} cleans the entire cache
+ * and should be used in teardown methods.
+ */
+public final class FileSystemTestBinder {
+
+ /**
+ * Empty configuration.
+ * Part of the FileSystem method signatures, but not used behind them.
+ */
+ public static final Configuration CONF = new Configuration(false);
+
+ /**
+ * Inject a filesystem into the cache.
+ * @param uri filesystem URI
+ * @param fs filesystem to inject
+ * @throws UncheckedIOException Hadoop UGI problems.
+ */
+ public static void addFileSystemForTesting(URI uri, FileSystem fs) {
+ try {
+ FileSystem.addFileSystemForTesting(uri, CONF, fs);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ /**
+ * Clean up the filesystem cache.
+ * This swallows any IOE nominally raised in the process, to ensure
+ * this can safely invoked in teardowns.
+ */
+ public static void cleanFilesystemCache() {
+ try {
+ FileSystem.closeAllForUGI(UserGroupInformation.getCurrentUser());
+ } catch (IOException ignored) {
+ // Ignore the exception as if getCurrentUser() fails then it'll
+ // have been impossible to add mock instances to a per-user cache.
+ }
+ }
+}
diff --git
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoopOpenFile.java
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoopOpenFile.java
new file mode 100644
index 000000000..bf29541cd
--- /dev/null
+++
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoopOpenFile.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.parquet.hadoop.util;
+
+import static
org.apache.hadoop.fs.FileSystemTestBinder.addFileSystemForTesting;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.concurrent.CompletableFuture;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystemTestBinder;
+import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.impl.FutureDataInputStreamBuilderImpl;
+import org.apache.parquet.io.SeekableInputStream;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+/**
+ * Test suite to validate behavior of opening files through
+ * use of FileSystem.openFile(), especially fallback
+ * to the original FileSystem.open() method when
+ * openFile() raises an IllegalArgumentException or other RTE.
+ * <p>
+ * These tests use classes in the package org.apache.hadoop.fs.impl}.
+ * Although an implementation package, it is tagged as
`LimitedPrivate("Filesystems")`;
+ * it is already used outside the hadoop codebase (e.g. google gcs).
+ */
+public class TestHadoopOpenFile {
+
+ private static final int FIRST = MockHadoopInputStream.TEST_ARRAY[0];
+ private URI fsUri;
+ private FileStatus status;
+ private Path path;
+ private Configuration conf;
+ private IOException fileNotFound;
+ private IllegalArgumentException illegal;
+
+ @Before
+ public void setUp() throws URISyntaxException {
+ // the schema "mock:" is used to not only be confident our injected
+ // instance is picked up, but to ensure that there will be no
+ // contamination of any real schema, such as file:
+ fsUri = new URI("mock://path/");
+ path = new Path("mock://path/file");
+
+ conf = new Configuration(false);
+ fileNotFound = new FileNotFoundException("file not found");
+ illegal = new IllegalArgumentException("illegal");
+ status = new FileStatus(10, false, 1, 1, 0, path);
+ }
+
+ /**
+ * Clean up the entire FS cache for the current user.
+ */
+ @After
+ public void tearDown() {
+ FileSystemTestBinder.cleanFilesystemCache();
+ }
+
+ /**
+ * The healthy path for opening a file.
+ */
+ @Test
+ public void testOpenFileGoodPath() throws Throwable {
+ final FileSystem mockFS = prepareMockFS();
+ final FSDataInputStream in = new FSDataInputStream(new
MockHadoopInputStream());
+ final StubOpenFileBuilder opener = new StubOpenFileBuilder(mockFS, path,
CompletableFuture.completedFuture(in));
+ doReturn(opener).when(mockFS).openFile(path);
+
+ // this looks up the FS binding via the status file path.
+ openAndRead(fileFromStatus());
+
+ // The fallback call of open(path) never took place.
+ Mockito.verify(mockFS, never()).open(path);
+ }
+
+ /**
+ * The openFile() call raises a RuntimeException which it is caught and the
+ * classic open() call invoked.
+ */
+ @Test
+ public void testOpenFileEarlyFailure() throws Throwable {
+ final FileSystem mockFS = prepareMockFS();
+ final FSDataInputStream in = new FSDataInputStream(new
MockHadoopInputStream());
+
+ Mockito.doThrow(illegal).when(mockFS).openFile(path);
+ doReturn(in).when(mockFS).open(path);
+
+ // this looks up the FS binding via the status file path.
+ openAndRead(fileFromStatus());
+ }
+
+ /**
+ * openFile() failure during the completable future execution with an
+ * RTE raised.
+ * Again, this triggers a fallback to open().
+ */
+ @Test
+ public void testOpenFileLateFailure() throws Throwable {
+ final FileSystem mockFS = prepareMockFS();
+ final FSDataInputStream in = new FSDataInputStream(new
MockHadoopInputStream());
+
+ final StubOpenFileBuilder opener = new StubOpenFileBuilder(
+ mockFS, path, CompletableFuture.completedFuture(null).thenApply((f) ->
{
+ throw illegal;
+ }));
+ doReturn(opener).when(mockFS).openFile(path);
+ doReturn(in).when(mockFS).open(path);
+
+ openAndRead(fileFromStatus());
+ }
+
+ /**
+ * Open a stream, read the first byte, and assert that it matches
+ * what is expected.
+ *
+ * @param inputFile input file
+ *
+ * @throws IOException failure to open
+ */
+ private static void openAndRead(final HadoopInputFile inputFile) throws
IOException {
+ try (SeekableInputStream stream = inputFile.newStream()) {
+ Assert.assertEquals("byte read", FIRST, stream.read());
+ }
+ }
+
+ /**
+ * If openFile() raises an IOException within the future,
+ * then it is thrown and the classic open() call never invoked.
+ */
+ @Test
+ public void testOpenFileRaisesIOException() throws Throwable {
+ final FileSystem mockFS = prepareMockFS();
+
+ final StubOpenFileBuilder opener = new StubOpenFileBuilder(
+ mockFS, path, CompletableFuture.completedFuture(null).thenApply((f) ->
{
+ // throw a wrapped IOE
+ throw new UncheckedIOException(fileNotFound);
+ }));
+ doReturn(opener).when(mockFS).openFile(path);
+
+ final HadoopInputFile inputFile = fileFromStatus();
+ Assert.assertThrows(FileNotFoundException.class, inputFile::newStream);
+ Mockito.verify(mockFS, never()).open(path);
+ }
+
+ /**
+ * If openFile() raises a RuntimeException, this it is caught and the.
+ * classic open() call invoked.
+ * If that call raises an IOE.
+ * Outcome: the IOE is thrown but the caught RTE is added to the
+ * suppressed list.
+ */
+ @Test
+ public void testOpenFileDoubleFailure() throws Throwable {
+ final FileSystem mockFS = prepareMockFS();
+
+ Mockito.doThrow(illegal).when(mockFS).openFile(path);
+ Mockito.doThrow(fileNotFound).when(mockFS).open(path);
+
+ // this looks up the FS binding via the status file path.
+ final HadoopInputFile inputFile = fileFromStatus();
+
+ final FileNotFoundException caught =
Assert.assertThrows(FileNotFoundException.class, inputFile::newStream);
+ Assert.assertSame(fileNotFound, caught);
+ final Throwable[] suppressed = caught.getSuppressed();
+ Assert.assertEquals("number of suppressed exceptions", 1,
suppressed.length);
+ Assert.assertSame(illegal, suppressed[0]);
+ }
+
+ /**
+ * The handling of a double RTE is the same as the case of
+ * the sequence of: RTE, IOE.
+ */
+ @Test
+ public void testOpenFileDoubleRTE() throws Throwable {
+ final FileSystem mockFS = prepareMockFS();
+
+ Mockito.doThrow(illegal).when(mockFS).openFile(path);
+ NullPointerException npe = new NullPointerException("null");
+ Mockito.doThrow(npe).when(mockFS).open(path);
+
+ // this looks up the FS binding via the status file path.
+ final HadoopInputFile inputFile = fileFromStatus();
+
+ final NullPointerException caught =
Assert.assertThrows(NullPointerException.class, inputFile::newStream);
+ Assert.assertSame(npe, caught);
+ final Throwable[] suppressed = caught.getSuppressed();
+ Assert.assertEquals("number of suppressed exceptions", 1,
suppressed.length);
+ Assert.assertSame(illegal, suppressed[0]);
+ }
+
+ /**
+ * Create a mock FileSystem with the foundational operations
+ * mocked. The FS is added as the binding for the mock URI.
+ *
+ * @return a mock FileSystem
+ *
+ * @throws IOException stub signature only.
+ */
+ private FileSystem prepareMockFS() throws IOException {
+ final FileSystem mockFS = mock(FileSystem.class);
+ doNothing().when(mockFS).close();
+ doReturn(conf).when(mockFS).getConf();
+ doReturn(status).when(mockFS).getFileStatus(path);
+
+ // register the FS instance under the mock URI
+ addFileSystemForTesting(fsUri, mockFS);
+ return mockFS;
+ }
+
+ /**
+ * Build an input file from the status field.
+ * @return an input file.
+ * @throws IOException failure to create the associated filesystem.
+ */
+ private HadoopInputFile fileFromStatus() throws IOException {
+ return HadoopInputFile.fromStatus(status, conf);
+ }
+
+ /**
+ * Stub implementation of {@link FutureDataInputStreamBuilder}.
+ * Trying to mock the interface is troublesome as the interface has added
+ * some new methods over time, instead this uses the base implementation
class
+ * within o.a.h.fs.impl.
+ */
+ private static final class StubOpenFileBuilder extends
FutureDataInputStreamBuilderImpl {
+
+ /**
+ * Operation to invoke to build the result.
+ */
+ private final CompletableFuture<FSDataInputStream> result;
+
+ /**
+ * Create the builder. The FS and path must be non-null.
+ *
+ * @param fileSystem fs
+ * @param path path to open
+ * @param result builder result.
+ */
+ private StubOpenFileBuilder(
+ final FileSystem fileSystem, Path path, final
CompletableFuture<FSDataInputStream> result) {
+ super(fileSystem, path);
+ this.result = result;
+ }
+
+ @Override
+ public CompletableFuture<FSDataInputStream> build()
+ throws IllegalArgumentException, UnsupportedOperationException {
+ return result;
+ }
+ }
+}