[
https://issues.apache.org/jira/browse/PARQUET-2276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17718680#comment-17718680
]
ASF GitHub Bot commented on PARQUET-2276:
-----------------------------------------
rdblue commented on code in PR #1084:
URL: https://github.com/apache/parquet-mr/pull/1084#discussion_r1182873931
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java:
##########
@@ -46,41 +83,91 @@ public class HadoopStreams {
*/
public static SeekableInputStream wrap(FSDataInputStream stream) {
Objects.requireNonNull(stream, "Cannot wrap a null input stream");
- if (isWrappedStreamByteBufferReadable(stream)) {
- return new H2SeekableInputStream(stream);
- } else {
- return new H1SeekableInputStream(stream);
+
+ // Try to check using hasCapabilities(str)
+ Boolean hasCapabilitiesResult =
isWrappedStreamByteBufferReadableHasCapabilities(stream);
+
+ // If it is null, then fall back to the old method
+ if (hasCapabilitiesResult != null) {
+ if (hasCapabilitiesResult) {
+ return new H2SeekableInputStream(stream);
+ } else {
+ return new H1SeekableInputStream(stream);
+ }
+ }
+
+ return isWrappedStreamByteBufferReadableLegacy(stream);
+ }
+
+ /**
+ * Is the inner stream byte buffer readable?
+ * The test is 'the stream is not FSDataInputStream
+ * and implements ByteBufferReadable'
+ *
+ * This logic is only used for Hadoop <2.9.x, and <3.x.x
+ *
+ * @param stream stream to probe
+ * @return A H2SeekableInputStream to access, or H1SeekableInputStream if
the stream is not seekable
+ */
+ private static SeekableInputStream
isWrappedStreamByteBufferReadableLegacy(FSDataInputStream stream) {
+ InputStream wrapped = stream.getWrappedStream();
+ if (wrapped instanceof FSDataInputStream) {
+ LOG.debug("Checking on wrapped stream {} of {} whether is
ByteBufferReadable", wrapped, stream);
+ return isWrappedStreamByteBufferReadableLegacy(((FSDataInputStream)
wrapped));
+ }
+ if (byteBufferReadableClass != null && h2SeekableConstructor != null &&
+ byteBufferReadableClass.isInstance(stream.getWrappedStream())) {
+ try {
+ return h2SeekableConstructor.newInstance(stream);
+ } catch (InstantiationException | IllegalAccessException e) {
+ LOG.warn("Could not instantiate H2SeekableInputStream, falling back to
byte array reads", e);
+ } catch (InvocationTargetException e) {
+ throw new ParquetDecodingException(
+ "Could not instantiate H2SeekableInputStream",
e.getTargetException());
+ }
}
+ return new H1SeekableInputStream(stream);
}
/**
* Is the inner stream byte buffer readable?
- * The test is "the stream is not FSDataInputStream
+ * The test is 'the stream is not FSDataInputStream
* and implements ByteBufferReadable'
*
* That is: all streams which implement ByteBufferReadable
- * other than FSDataInputStream successfuly support read(ByteBuffer).
- * This is true for all filesytem clients the hadoop codebase.
+ * other than FSDataInputStream successfully support read(ByteBuffer).
+ * This is true for all filesystem clients the hadoop codebase.
*
* In hadoop 3.3.0+, the StreamCapabilities probe can be used to
* check this: only those streams which provide the read(ByteBuffer)
* semantics MAY return true for the probe "in:readbytebuffer";
* FSDataInputStream will pass the probe down to the underlying stream.
*
* @param stream stream to probe
- * @return true if it is safe to a H2SeekableInputStream to access the data
+ * @return true if it is safe to a H2SeekableInputStream to access
+ * the data, null when it cannot be determined
*/
- private static boolean isWrappedStreamByteBufferReadable(FSDataInputStream
stream) {
- if (stream.hasCapability("in:readbytebuffer")) {
- // stream is issuing the guarantee that it implements the
- // API. Holds for all implementations in hadoop-*
- // since Hadoop 3.3.0 (HDFS-14111).
- return true;
+ private static Boolean
isWrappedStreamByteBufferReadableHasCapabilities(FSDataInputStream stream) {
+ Method methodHasCapabilities;
+ try {
+ methodHasCapabilities = stream.getClass().getMethod("hasCapability",
String.class);
Review Comment:
You can use DynMethods to get an unbound `hasCapability` method. That can be
done statically, so all you need to do is check whether it is present and call
it.
> ParquetReader reads do not work with Hadoop version 2.8.5
> ---------------------------------------------------------
>
> Key: PARQUET-2276
> URL: https://issues.apache.org/jira/browse/PARQUET-2276
> Project: Parquet
> Issue Type: Bug
> Components: parquet-mr
> Affects Versions: 1.13.0
> Reporter: Atul Mohan
> Assignee: Fokko Driesprong
> Priority: Major
> Fix For: 1.14.0, 1.13.1
>
>
> {{ParquetReader.read() fails with the following exception on parquet-mr
> version 1.13.0 when using hadoop version 2.8.5:}}
> {code:java}
> java.lang.NoSuchMethodError: 'boolean
> org.apache.hadoop.fs.FSDataInputStream.hasCapability(java.lang.String)'
> at
> org.apache.parquet.hadoop.util.HadoopStreams.isWrappedStreamByteBufferReadable(HadoopStreams.java:74)
>
> at org.apache.parquet.hadoop.util.HadoopStreams.wrap(HadoopStreams.java:49)
> at
> org.apache.parquet.hadoop.util.HadoopInputFile.newStream(HadoopInputFile.java:69)
>
> at
> org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:787)
>
> at
> org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:657)
> at org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:162)
> org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135)
> {code}
>
>
>
> From an initial investigation, it looks like HadoopStreams has started using
> [FSDataInputStream.hasCapability|https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java#L74]
> but _FSDataInputStream_ does not have the _hasCapability_ API in [hadoop
> 2.8.x|https://hadoop.apache.org/docs/r2.8.3/api/org/apache/hadoop/fs/FSDataInputStream.html].
--
This message was sent by Atlassian Jira
(v8.20.10#820010)