This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch parquet-1.13.x
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/parquet-1.13.x by this push:
     new e2c2499da PARQUET-2276: Bring back support for Hadoop 2.7.3 (#1084) 
(#1090)
e2c2499da is described below

commit e2c2499da64bcb69555bfdfe5c06b6e511f9fe73
Author: Fokko Driesprong <[email protected]>
AuthorDate: Tue May 9 09:17:54 2023 +0200

    PARQUET-2276: Bring back support for Hadoop 2.7.3 (#1084) (#1090)
    
    * Bring back support for Hadoop 2.7.3
    
    * Simplify the code
    
    * Fix the naming
    
    * Comments
---
 .../apache/parquet/hadoop/util/HadoopStreams.java  | 62 +++++++++++++++++++---
 pom.xml                                            |  2 +-
 2 files changed, 56 insertions(+), 8 deletions(-)

diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java
index bafb45ad3..fe7b4c5a8 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.parquet.io.PositionOutputStream;
 import org.apache.parquet.io.SeekableInputStream;
+import org.apache.parquet.util.DynMethods;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,6 +38,13 @@ public class HadoopStreams {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(HadoopStreams.class);
 
+  private static final DynMethods.UnboundMethod hasCapabilitiesMethod =
+    new DynMethods
+      .Builder("hasCapabilities")
+      .impl(FSDataInputStream.class, "hasCapabilities", String.class)
+      .orNoop()
+      .build();
+
   /**
    * Wraps a {@link FSDataInputStream} in a {@link SeekableInputStream}
    * implementation for Parquet readers.
@@ -46,7 +54,39 @@ public class HadoopStreams {
    */
   public static SeekableInputStream wrap(FSDataInputStream stream) {
     Objects.requireNonNull(stream, "Cannot wrap a null input stream");
-    if (isWrappedStreamByteBufferReadable(stream)) {
+
+    // Try to check using hasCapabilities(str)
+    Boolean hasCapabilitiesResult = isWrappedStreamByteBufferReadable(stream);
+
+    // If it is null, then fall back to the old method
+    if (hasCapabilitiesResult != null) {
+      if (hasCapabilitiesResult) {
+        return new H2SeekableInputStream(stream);
+      } else {
+        return new H1SeekableInputStream(stream);
+      }
+    }
+
+    return unwrapByteBufferReadableLegacy(stream);
+  }
+
+  /**
+   * Is the inner stream byte buffer readable?
+   * The test is 'the stream is not FSDataInputStream
+   * and implements ByteBufferReadable'
+   *
+   * This logic is only used for Hadoop <2.9.x, and <3.x.x
+   *
+   * @param stream stream to probe
+   * @return A H2SeekableInputStream to access, or H1SeekableInputStream if 
the stream is not seekable
+   */
+  private static SeekableInputStream 
unwrapByteBufferReadableLegacy(FSDataInputStream stream) {
+    InputStream wrapped = stream.getWrappedStream();
+    if (wrapped instanceof FSDataInputStream) {
+      LOG.debug("Checking on wrapped stream {} of {} whether is 
ByteBufferReadable", wrapped, stream);
+      return unwrapByteBufferReadableLegacy(((FSDataInputStream) wrapped));
+    }
+    if (stream.getWrappedStream() instanceof ByteBufferReadable) {
       return new H2SeekableInputStream(stream);
     } else {
       return new H1SeekableInputStream(stream);
@@ -55,12 +95,12 @@ public class HadoopStreams {
 
   /**
    * Is the inner stream byte buffer readable?
-   * The test is "the stream is not FSDataInputStream
+   * The test is 'the stream is not FSDataInputStream
    * and implements ByteBufferReadable'
    *
    * That is: all streams which implement ByteBufferReadable
-   * other than FSDataInputStream successfuly support read(ByteBuffer).
-   * This is true for all filesytem clients the hadoop codebase.
+   * other than FSDataInputStream successfully support read(ByteBuffer).
+   * This is true for all filesystem clients the hadoop codebase.
    *
    * In hadoop 3.3.0+, the StreamCapabilities probe can be used to
    * check this: only those streams which provide the read(ByteBuffer)
@@ -68,10 +108,18 @@ public class HadoopStreams {
    * FSDataInputStream will pass the probe down to the underlying stream.
    *
    * @param stream stream to probe
-   * @return true if it is safe to a H2SeekableInputStream to access the data
+   * @return true if it is safe to a H2SeekableInputStream to access
+   * the data, null when it cannot be determined because of missing 
hasCapabilities
    */
-  private static boolean isWrappedStreamByteBufferReadable(FSDataInputStream 
stream) {
-    if (stream.hasCapability("in:readbytebuffer")) {
+  private static Boolean isWrappedStreamByteBufferReadable(FSDataInputStream 
stream) {
+    if (hasCapabilitiesMethod.isNoop()) {
+      // When the method is not available, just return a null
+      return null;
+    }
+
+    boolean isByteBufferReadable = hasCapabilitiesMethod.invoke(stream, 
"in:readbytebuffer");
+
+    if (isByteBufferReadable) {
       // stream is issuing the guarantee that it implements the
       // API. Holds for all implementations in hadoop-*
       // since Hadoop 3.3.0 (HDFS-14111).
diff --git a/pom.xml b/pom.xml
index eb568e097..0fb655570 100644
--- a/pom.xml
+++ b/pom.xml
@@ -598,7 +598,7 @@
     <profile>
       <id>hadoop2</id>
       <properties>
-        <hadoop.version>2.9.2</hadoop.version>
+        <hadoop.version>2.7.3</hadoop.version>
       </properties>
     </profile>
 

Reply via email to