This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch parquet-1.13.x
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/parquet-1.13.x by this push:
new e2c2499da PARQUET-2276: Bring back support for Hadoop 2.7.3 (#1084)
(#1090)
e2c2499da is described below
commit e2c2499da64bcb69555bfdfe5c06b6e511f9fe73
Author: Fokko Driesprong <[email protected]>
AuthorDate: Tue May 9 09:17:54 2023 +0200
PARQUET-2276: Bring back support for Hadoop 2.7.3 (#1084) (#1090)
* Bring back support for Hadoop 2.7.3
* Simplify the code
* Fix the naming
* Comments
---
.../apache/parquet/hadoop/util/HadoopStreams.java | 62 +++++++++++++++++++---
pom.xml | 2 +-
2 files changed, 56 insertions(+), 8 deletions(-)
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java
index bafb45ad3..fe7b4c5a8 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.parquet.io.PositionOutputStream;
import org.apache.parquet.io.SeekableInputStream;
+import org.apache.parquet.util.DynMethods;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,6 +38,13 @@ public class HadoopStreams {
private static final Logger LOG =
LoggerFactory.getLogger(HadoopStreams.class);
+ private static final DynMethods.UnboundMethod hasCapabilitiesMethod =
+ new DynMethods
+ .Builder("hasCapabilities")
+ .impl(FSDataInputStream.class, "hasCapabilities", String.class)
+ .orNoop()
+ .build();
+
/**
* Wraps a {@link FSDataInputStream} in a {@link SeekableInputStream}
* implementation for Parquet readers.
@@ -46,7 +54,39 @@ public class HadoopStreams {
*/
public static SeekableInputStream wrap(FSDataInputStream stream) {
Objects.requireNonNull(stream, "Cannot wrap a null input stream");
- if (isWrappedStreamByteBufferReadable(stream)) {
+
+ // Try to check using hasCapabilities(str)
+ Boolean hasCapabilitiesResult = isWrappedStreamByteBufferReadable(stream);
+
+ // If it is null, then fall back to the old method
+ if (hasCapabilitiesResult != null) {
+ if (hasCapabilitiesResult) {
+ return new H2SeekableInputStream(stream);
+ } else {
+ return new H1SeekableInputStream(stream);
+ }
+ }
+
+ return unwrapByteBufferReadableLegacy(stream);
+ }
+
+ /**
+ * Is the inner stream byte buffer readable?
+ * The test is 'the stream is not FSDataInputStream
+ * and implements ByteBufferReadable'
+ *
+ * This logic is only used for Hadoop <2.9.x, and <3.x.x
+ *
+ * @param stream stream to probe
+ * @return A H2SeekableInputStream to access, or H1SeekableInputStream if
the stream is not seekable
+ */
+ private static SeekableInputStream
unwrapByteBufferReadableLegacy(FSDataInputStream stream) {
+ InputStream wrapped = stream.getWrappedStream();
+ if (wrapped instanceof FSDataInputStream) {
+ LOG.debug("Checking on wrapped stream {} of {} whether is
ByteBufferReadable", wrapped, stream);
+ return unwrapByteBufferReadableLegacy(((FSDataInputStream) wrapped));
+ }
+ if (stream.getWrappedStream() instanceof ByteBufferReadable) {
return new H2SeekableInputStream(stream);
} else {
return new H1SeekableInputStream(stream);
@@ -55,12 +95,12 @@ public class HadoopStreams {
/**
* Is the inner stream byte buffer readable?
- * The test is "the stream is not FSDataInputStream
+ * The test is 'the stream is not FSDataInputStream
* and implements ByteBufferReadable'
*
* That is: all streams which implement ByteBufferReadable
- * other than FSDataInputStream successfuly support read(ByteBuffer).
- * This is true for all filesytem clients the hadoop codebase.
+ * other than FSDataInputStream successfully support read(ByteBuffer).
+ * This is true for all filesystem clients the hadoop codebase.
*
* In hadoop 3.3.0+, the StreamCapabilities probe can be used to
* check this: only those streams which provide the read(ByteBuffer)
@@ -68,10 +108,18 @@ public class HadoopStreams {
* FSDataInputStream will pass the probe down to the underlying stream.
*
* @param stream stream to probe
- * @return true if it is safe to a H2SeekableInputStream to access the data
+ * @return true if it is safe to a H2SeekableInputStream to access
+ * the data, null when it cannot be determined because of missing
hasCapabilities
*/
- private static boolean isWrappedStreamByteBufferReadable(FSDataInputStream
stream) {
- if (stream.hasCapability("in:readbytebuffer")) {
+ private static Boolean isWrappedStreamByteBufferReadable(FSDataInputStream
stream) {
+ if (hasCapabilitiesMethod.isNoop()) {
+ // When the method is not available, just return a null
+ return null;
+ }
+
+ boolean isByteBufferReadable = hasCapabilitiesMethod.invoke(stream,
"in:readbytebuffer");
+
+ if (isByteBufferReadable) {
// stream is issuing the guarantee that it implements the
// API. Holds for all implementations in hadoop-*
// since Hadoop 3.3.0 (HDFS-14111).
diff --git a/pom.xml b/pom.xml
index eb568e097..0fb655570 100644
--- a/pom.xml
+++ b/pom.xml
@@ -598,7 +598,7 @@
<profile>
<id>hadoop2</id>
<properties>
- <hadoop.version>2.9.2</hadoop.version>
+ <hadoop.version>2.7.3</hadoop.version>
</properties>
</profile>