This is an automated email from the ASF dual-hosted git repository.
volodymyr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new b7e9f62 DRILL-7874: Ensure DrillFSDataInputStream.read populates byte
array of the requested length
b7e9f62 is described below
commit b7e9f6294436dd382b547a51c8bc93d4e757a9af
Author: Volodymyr Vysotskyi <[email protected]>
AuthorDate: Thu Mar 4 19:38:34 2021 +0200
DRILL-7874: Ensure DrillFSDataInputStream.read populates byte array of the
requested length
---
.../exec/store/dfs/DrillFSDataInputStream.java | 25 ++++++++++--
.../drill/exec/store/dfs/DrillFileSystem.java | 46 +++-------------------
.../drill/exec/store/pcap/decoder/Packet.java | 2 +-
3 files changed, 27 insertions(+), 46 deletions(-)
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFSDataInputStream.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFSDataInputStream.java
index 46afd17..7c3c913 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFSDataInputStream.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFSDataInputStream.java
@@ -18,6 +18,7 @@
package org.apache.drill.exec.store.dfs;
import org.apache.drill.exec.ops.OperatorStats;
+import org.apache.drill.shaded.guava.com.google.common.io.ByteStreams;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.PositionedReadable;
@@ -40,12 +41,12 @@ public class DrillFSDataInputStream extends
FSDataInputStream {
private final OpenFileTracker openFileTracker;
private final OperatorStats operatorStats;
- public DrillFSDataInputStream(FSDataInputStream in, OperatorStats
operatorStats) throws IOException {
+ public DrillFSDataInputStream(FSDataInputStream in, OperatorStats
operatorStats) {
this(in, operatorStats, null);
}
public DrillFSDataInputStream(FSDataInputStream in, OperatorStats
operatorStats,
- OpenFileTracker openFileTracker) throws IOException {
+ OpenFileTracker openFileTracker) {
super(new WrappedInputStream(in, operatorStats));
underlyingIs = in;
this.openFileTracker = openFileTracker;
@@ -213,7 +214,7 @@ public class DrillFSDataInputStream extends
FSDataInputStream {
public int read(byte[] b, int off, int len) throws IOException {
operatorStats.startWait();
try {
- return is.read(b, off, len);
+ return readBytes(b, off, len);
} finally {
operatorStats.stopWait();
}
@@ -223,12 +224,28 @@ public class DrillFSDataInputStream extends
FSDataInputStream {
public int read(byte[] b) throws IOException {
operatorStats.startWait();
try {
- return is.read(b);
+ return readBytes(b, 0, b.length);
} finally {
operatorStats.stopWait();
}
}
+ /**
+ * Reads up to {@code len} bytes of data from the input stream into an
array of bytes.
+ * This method guarantees that regardless of the underlying stream
implementation,
+ * the byte array will be populated with either {@code len} bytes or
+ * all available in stream bytes if they are less than {@code len}.
+ */
+ private int readBytes(byte[] b, int off, int len) throws IOException {
+ int read = ByteStreams.read(is, b, off, len);
+ if (read == 0 && len > 0) {
+ // ByteStreams.read() doesn't return -1 at EOF, but returns 0,
+ // if no bytes available in the stream
+ return -1;
+ }
+ return read;
+ }
+
@Override
public long skip(long n) throws IOException {
return is.skip(n);
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
index 4f72699..0da534b 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
@@ -17,7 +17,6 @@
*/
package org.apache.drill.exec.store.dfs;
-import java.io.ByteArrayInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
@@ -27,8 +26,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
-import org.apache.commons.io.IOUtils;
-import org.apache.drill.common.AutoCloseables;
import org.apache.drill.exec.ops.OperatorStats;
import org.apache.drill.exec.util.AssertionUtil;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
@@ -788,51 +785,18 @@ public class DrillFileSystem extends FileSystem
implements OpenFileTracker {
/**
* Returns an InputStream from a Hadoop path. If the data is compressed,
this method will return a compressed
- * InputStream depending on the codec. Note that if the results of this
method are sent to a third party parser
- * that works with bytes or individual characters directly, you should use
the openDecompressedInputStream method.
+ * InputStream depending on the codec.
* @param path Input file path
* @return InputStream of opened file path
* @throws IOException If the file is unreachable, unavailable or otherwise
unreadable
*/
public InputStream openPossiblyCompressedStream(Path path) throws
IOException {
- CompressionCodec codec = codecFactory.getCodec(path); // infers from file
ext.
+ CompressionCodec codec = getCodec(path); // infers from file ext.
+ InputStream inputStream = open(path);
if (codec != null) {
- return codec.createInputStream(open(path));
- } else {
- return open(path);
+ inputStream = codec.createInputStream(inputStream);
}
- }
-
- /**
- * Returns a normal, decompressed InputStream. Some parsers, particularly
those
- * that read raw bytes, generate errors when passed Hadoop ZipCompressed
InputStreams.
- * This utility function wraps some of these functions so that a format
plugin can be guaranteed
- * readable bytes.
- * @param path The file being read
- * @return Decompressed InputStream of the input file
- * @throws IOException If the file is unreadable or uses an unknown
compression codec
- */
- public InputStream openDecompressedInputStream(Path path) throws IOException
{
- CompressionCodec codec = getCodec(path);
- if (codec == null) {
- return open(path);
- } else {
- InputStream compressedStream = codec.createInputStream(open(path));
- byte[] bytes = IOUtils.toByteArray(compressedStream);
- AutoCloseables.closeSilently(compressedStream);
- return new ByteArrayInputStream(bytes);
- }
- }
-
- /**
- * There are parsers which require an uncompressed input stream to read the
data
- * properly. This method helps identify whether the file being read is in
fact compressed.
- * @param path The file being read
- * @return True if the file is compressed, false if not.
- */
- public boolean isCompressed(Path path) {
- CompressionCodec codec = codecFactory.getCodec(path);
- return codec != null;
+ return inputStream;
}
/**
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java
index befa7d9..a1d188c 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java
@@ -467,7 +467,7 @@ public class Packet implements Comparable<Packet> {
originalLength = getIntFileOrder(byteOrder, header, offset +
PacketConstants.ORIGINAL_LENGTH_OFFSET);
packetLength = getIntFileOrder(byteOrder, header, offset +
PacketConstants.ACTUAL_LENGTH_OFFSET);
Preconditions.checkState(originalLength <= maxLength,
- "Packet too long (%d bytes)", originalLength);
+ "Packet too long (%s bytes)", originalLength);
}
private long getTimestampMicro(final byte[] header, final boolean byteOrder,
final int offset) {