[
https://issues.apache.org/jira/browse/PARQUET-2134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17712127#comment-17712127
]
ASF GitHub Bot commented on PARQUET-2134:
-----------------------------------------
sunchao commented on code in PR #951:
URL: https://github.com/apache/parquet-mr/pull/951#discussion_r1166131285
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java:
##########
@@ -50,51 +46,45 @@ public class HadoopStreams {
*/
public static SeekableInputStream wrap(FSDataInputStream stream) {
Objects.requireNonNull(stream, "Cannot wrap a null input stream");
- if (byteBufferReadableClass != null && h2SeekableConstructor != null &&
- byteBufferReadableClass.isInstance(stream.getWrappedStream())) {
- try {
- return h2SeekableConstructor.newInstance(stream);
- } catch (InstantiationException | IllegalAccessException e) {
- LOG.warn("Could not instantiate H2SeekableInputStream, falling back to
byte array reads", e);
- return new H1SeekableInputStream(stream);
- } catch (InvocationTargetException e) {
- throw new ParquetDecodingException(
- "Could not instantiate H2SeekableInputStream",
e.getTargetException());
- }
+ if (isWrappedStreamByteBufferReadable(stream)) {
+ return new H2SeekableInputStream(stream);
} else {
return new H1SeekableInputStream(stream);
}
}
- private static Class<?> getReadableClass() {
- try {
- return Class.forName("org.apache.hadoop.fs.ByteBufferReadable");
- } catch (ClassNotFoundException | NoClassDefFoundError e) {
- return null;
+ /**
+ * Is the inner stream byte buffer readable?
+ * The test is "the stream is not FSDataInputStream
+ * and implements ByteBufferReadable'
+ *
+ * That is: all streams which implement ByteBufferReadable
+ * other than FSDataInputStream successfuly support read(ByteBuffer).
+ * This is true for all filesytem clients the hadoop codebase.
+ *
+ * In hadoop 3.3.0+, the StreamCapabilities probe can be used to
+ * check this: only those streams which provide the read(ByteBuffer)
+ * semantics MAY return true for the probe "in:readbytebuffer";
+ * FSDataInputStream will pass the probe down to the underlying stream.
+ *
+ * @param stream stream to probe
+ * @return true if it is safe to a H2SeekableInputStream to access the data
+ */
+ private static boolean isWrappedStreamByteBufferReadable(FSDataInputStream
stream) {
+ if (stream.hasCapability("in:readbytebuffer")) {
Review Comment:
also see related JIRA: https://issues.apache.org/jira/browse/PARQUET-2276
> Incorrect type checking in HadoopStreams.wrap
> ---------------------------------------------
>
> Key: PARQUET-2134
> URL: https://issues.apache.org/jira/browse/PARQUET-2134
> Project: Parquet
> Issue Type: Bug
> Components: parquet-mr
> Affects Versions: 1.8.3, 1.10.1, 1.11.2, 1.12.2
> Reporter: Todd Gao
> Priority: Minor
> Fix For: 1.13.0
>
>
> The method
> [HadoopStreams.wrap|https://github.com/apache/parquet-mr/blob/4d062dc37577e719dcecc666f8e837843e44a9be/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java#L51]
> wraps an FSDataInputStream to a SeekableInputStream.
> It checks whether the underlying stream of the passed FSDataInputStream
> implements ByteBufferReadable: if true, wraps the FSDataInputStream to
> H2SeekableInputStream; otherwise, wraps to H1SeekableInputStream.
> In some cases, we may add another wrapper over FSDataInputStream. For
> example,
> {code:java}
> class CustomDataInputStream extends FSDataInputStream {
> public CustomDataInputStream(FSDataInputStream original) {
> super(original);
> }
> }
> {code}
> When we create an FSDataInputStream, whose underlying stream does not
> implements ByteBufferReadable, and then creates a CustomDataInputStream with
> it. If we use HadoopStreams.wrap to create a SeekableInputStream, we may get
> an error like
> {quote}java.lang.UnsupportedOperationException: Byte-buffer read unsupported
> by input stream{quote}
> We can fix this by taking recursive checks over the underlying stream of
> FSDataInputStream.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)