This is an automated email from the ASF dual-hosted git repository.
cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new aa43670 DRILL-7834: Add Utility Functions for Compressed Files
aa43670 is described below
commit aa436704d8fcb2d812a80ab99a69d48917ec85c3
Author: Charles Givre <[email protected]>
AuthorDate: Mon Jan 4 10:27:09 2021 -0500
DRILL-7834: Add Utility Functions for Compressed Files
---
.../drill/exec/store/dfs/DrillFileSystem.java | 53 ++++++++++++++++++++++
1 file changed, 53 insertions(+)
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
index 9380e41..18a6211 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.store.dfs;
+import java.io.ByteArrayInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
@@ -26,6 +27,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
+import org.apache.commons.io.IOUtils;
import org.apache.drill.exec.ops.OperatorStats;
import org.apache.drill.exec.util.AssertionUtil;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
@@ -783,6 +785,14 @@ public class DrillFileSystem extends FileSystem implements
OpenFileTracker {
underlyingFs.removeXAttr(path, name);
}
+ /**
+ * Returns an InputStream from a Hadoop path. If the data is compressed,
this method will return a compressed
+ * InputStream depending on the codec. Note that if the results of this
method are sent to a third party parser
+ * that works with bytes or individual characters directly, you should use
the openDecompressedInputStream method.
+ * @param path Input file path
+ * @return InputStream of opened file path
+ * @throws IOException If the file is unreachable, unavailable or otherwise
unreadable
+ */
public InputStream openPossiblyCompressedStream(Path path) throws
IOException {
CompressionCodec codec = codecFactory.getCodec(path); // infers from file
ext.
if (codec != null) {
@@ -791,6 +801,49 @@ public class DrillFileSystem extends FileSystem implements
OpenFileTracker {
return open(path);
}
}
+
+ /**
+ * Returns a normal, decompressed InputStream. Some parsers, particularly
those
+ * that read raw bytes, generate errors when passed Hadoop ZipCompressed
InputStreams.
+ * This utility function wraps some of these functions so that a format
plugin can be guaranteed
+ * readable bytes.
+ * @param path The file being read
+ * @return Decompressed InputStream of the input file
+ * @throws IOException If the file is unreadable or uses an unknown
compression codec
+ */
+ public InputStream openDecompressedInputStream(Path path) throws IOException
{
+ CompressionCodec codec = getCodec(path);
+ if (codec == null) {
+ return open(path);
+ } else {
+ InputStream compressedStream = codec.createInputStream(open(path));
+ byte[] bytes = IOUtils.toByteArray(compressedStream);
+ return new ByteArrayInputStream(bytes);
+ }
+ }
+
+ /**
+ * There are parsers which require an uncompressed input stream to read the
data
+ * properly. This method helps identify whether the file being read is in
fact compressed.
+ * @param path The file being read
+ * @return True if the file is compressed, false if not.
+ */
+ public boolean isCompressed(Path path) {
+ CompressionCodec codec = codecFactory.getCodec(path);
+ return codec != null;
+ }
+
+ /**
+ * Returns the {@link org.apache.hadoop.io.compress.CompressionCodec} for a
given file. This
+ * can be used to determine the type of compression (if any) which was used.
Returns null if the
+ * file is not compressed.
+ * @param path The file of unknown compression
+ * @return CompressionCodec used by the file. Null if the file is not
compressed.
+ */
+ public CompressionCodec getCodec(Path path) {
+ return codecFactory.getCodec(path);
+ }
+
@Override
public void fileOpened(Path path, DrillFSDataInputStream fsDataInputStream) {
openedFiles.put(fsDataInputStream, new DebugStackTrace(path,
Thread.currentThread().getStackTrace()));