Repository: apex-malhar Updated Branches: refs/heads/master 8eb750053 -> c128091c7
APEXMALHAR-2487 Added support for Snappy compression in FilterStreamProvider, which in turn enables Snappy output. Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/3530fd93 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/3530fd93 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/3530fd93 Branch: refs/heads/master Commit: 3530fd93a98cf027c0cc76060ead3d9cf7852cb6 Parents: 2fe2903 Author: Ilya Ganelin <[email protected]> Authored: Fri Apr 28 21:54:06 2017 -0700 Committer: Ilya Ganelin <[email protected]> Committed: Tue May 9 09:37:37 2017 -0700 ---------------------------------------------------------------------- .../lib/io/fs/FilterStreamCodec.java | 83 +++++++++- .../io/fs/AbstractFileOutputOperatorTest.java | 166 +++++++++++++++++-- 2 files changed, 235 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/3530fd93/library/src/main/java/com/datatorrent/lib/io/fs/FilterStreamCodec.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FilterStreamCodec.java b/library/src/main/java/com/datatorrent/lib/io/fs/FilterStreamCodec.java index baf2297..cdef3b6 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/FilterStreamCodec.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/FilterStreamCodec.java @@ -18,6 +18,7 @@ */ package com.datatorrent.lib.io.fs; +import java.io.FilterOutputStream; import java.io.IOException; import java.io.OutputStream; import java.util.zip.GZIPOutputStream; @@ -25,6 +26,11 @@ import java.util.zip.GZIPOutputStream; import javax.crypto.Cipher; import javax.crypto.CipherOutputStream; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.compress.CompressionOutputStream; +import org.apache.hadoop.io.compress.SnappyCodec; +import org.apache.hadoop.io.compress.snappy.SnappyCompressor; + /** * Filters for compression and encryption. * @@ -99,7 +105,8 @@ public class FilterStreamCodec @Override public FilterStreamContext<CipherOutputStream> getFilterStreamContext(OutputStream outputStream) throws IOException { - return new FilterStreamContext.SimpleFilterStreamContext<CipherOutputStream>(new CipherOutputStream(outputStream, cipher)); + return new FilterStreamContext.SimpleFilterStreamContext<CipherOutputStream>( + new CipherOutputStream(outputStream, cipher)); } @Override @@ -108,4 +115,78 @@ public class FilterStreamCodec } } + + public static class SnappyFilterStream extends FilterOutputStream + { + /** + * Creates an output stream filter built on top of the specified + * underlying output stream. + * + * @param out the underlying output stream to be assigned to + * the field <tt>this.out</tt> for later use, or + * <code>null</code> if this instance is to be + * created without an underlying stream. + */ + public SnappyFilterStream(CompressionOutputStream out) + { + super(out); + } + + public void finish() throws IOException + { + ((CompressionOutputStream)out).finish(); + } + } + + public static class SnappyFilterStreamContext extends FilterStreamContext.BaseFilterStreamContext<SnappyFilterStream> + { + private int bufferSize = 256 * 1024; + + public void setBufferSize(int bufferSize) + { + this.bufferSize = bufferSize; + } + + public SnappyFilterStreamContext(OutputStream outputStream) throws IOException + { + SnappyCodec codec = new SnappyCodec(); + codec.setConf(new Configuration()); + try { + filterStream = new SnappyFilterStream( + codec.createOutputStream(outputStream, new SnappyCompressor(bufferSize))); + } catch (IOException e) { + throw e; + } + } + + @Override + public void finalizeContext() throws IOException + { + try { + filterStream.finish(); + } catch (IOException e) { + throw e; + } + } + } + + /** + * A provider for Snappy filter + */ + public static class SnappyFilterStreamProvider implements FilterStreamProvider<SnappyFilterStream, + OutputStream> + { + @Override + public FilterStreamContext<SnappyFilterStream> getFilterStreamContext(OutputStream outputStream) + throws IOException + { + return new SnappyFilterStreamContext(outputStream); + } + + @Override + public void reclaimFilterStreamContext(FilterStreamContext<SnappyFilterStream> filterStreamContext) + { + + } + } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/3530fd93/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java index 472aa93..ceb1e7d 100644 --- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java @@ -18,10 +18,12 @@ */ package com.datatorrent.lib.io.fs; +import java.io.BufferedOutputStream; import java.io.BufferedReader; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileInputStream; +import java.io.FileOutputStream; import java.io.FileWriter; import java.io.FilterOutputStream; import java.io.IOException; @@ -55,6 +57,10 @@ import org.slf4j.LoggerFactory; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionInputStream; +import org.apache.hadoop.io.compress.SnappyCodec; +import org.apache.hadoop.util.ReflectionUtils; import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.io.Input; @@ -74,6 +80,7 @@ import com.datatorrent.lib.util.TestUtils.TestInfo; import com.datatorrent.netlet.util.DTThrowable; import static com.datatorrent.lib.helper.OperatorContextTestHelper.mockOperatorContext; +import static org.junit.Assert.assertEquals; public class AbstractFileOutputOperatorTest { @@ -1585,20 +1592,9 @@ public class AbstractFileOutputOperatorTest Assert.assertEquals("Part file names", fileNames, getFileNames(files)); } - @Test - public void testCompression() throws IOException + private void writeCompressedData(EvenOddHDFSExactlyOnceWriter writer, File evenFile, + File oddFile, List<Long> evenOffsets, List<Long> oddOffsets) { - EvenOddHDFSExactlyOnceWriter writer = new EvenOddHDFSExactlyOnceWriter(); - writer.setFilterStreamProvider(new FilterStreamCodec.GZipFilterStreamProvider()); - - File evenFile = new File(testMeta.getDir(), EVEN_FILE); - File oddFile = new File(testMeta.getDir(), ODD_FILE); - - // To get around the multi member gzip issue with openjdk - // http://bugs.java.com/bugdatabase/view_bug.do?bug_id=4691425 - List<Long> evenOffsets = new ArrayList<Long>(); - List<Long> oddOffsets = new ArrayList<Long>(); - writer.setFilePath(testMeta.getDir()); writer.setAlwaysWriteToTmp(false); writer.setup(testMeta.testOperatorContext); @@ -1617,12 +1613,101 @@ public class AbstractFileOutputOperatorTest } writer.teardown(); + } + + + @Test + public void testGzipCompression() throws IOException + { + EvenOddHDFSExactlyOnceWriter writer = new EvenOddHDFSExactlyOnceWriter(); + writer.setFilterStreamProvider(new FilterStreamCodec.GZipFilterStreamProvider()); + + File evenFile = new File(testMeta.getDir(), EVEN_FILE); + File oddFile = new File(testMeta.getDir(), ODD_FILE); + + // To get around the multi member gzip issue with openjdk + // http://bugs.java.com/bugdatabase/view_bug.do?bug_id=4691425 + List<Long> evenOffsets = new ArrayList<Long>(); + List<Long> oddOffsets = new ArrayList<Long>(); + + writeCompressedData(writer, evenFile, oddFile, evenOffsets, oddOffsets); checkCompressedFile(evenFile, evenOffsets, 0, 5, 1000, null, null); checkCompressedFile(oddFile, oddOffsets, 1, 5, 1000, null, null); } @Test + public void testSnappyStreamProvider() throws IOException + { + if (checkNativeSnappy()) { + return; + } + + EvenOddHDFSExactlyOnceWriter writer = new EvenOddHDFSExactlyOnceWriter(); + writer.setFilterStreamProvider(new FilterStreamCodec.SnappyFilterStreamProvider()); + + File evenFile = new File(testMeta.getDir(), EVEN_FILE); + File oddFile = new File(testMeta.getDir(), ODD_FILE); + + // To get around the multi member gzip issue with openjdk + // http://bugs.java.com/bugdatabase/view_bug.do?bug_id=4691425 + List<Long> evenOffsets = new ArrayList<Long>(); + List<Long> oddOffsets = new ArrayList<Long>(); + + writeCompressedData(writer, evenFile, oddFile, evenOffsets, oddOffsets); + + checkSnappyFile(evenFile, evenOffsets, 0, 5, 1000); + checkSnappyFile(oddFile, oddOffsets, 1, 5, 1000); + } + + private boolean checkNativeSnappy() + { + try { + SnappyCodec.checkNativeCodeLoaded(); + } catch (UnsatisfiedLinkError u) { + LOG.error("WARNING: Skipping Snappy compression test since native libraries were not found."); + return true; + } catch (RuntimeException e) { + LOG.error("WARNING: Skipping Snappy compression test since native libraries were not found."); + return true; + } + return false; + } + + + @Test + public void testSnappyCompressionSimple() throws IOException + { + if (checkNativeSnappy()) { + return; + } + + File snappyFile = new File(testMeta.getDir(), "snappyTestFile.snappy"); + + BufferedOutputStream os = new BufferedOutputStream(new FileOutputStream(snappyFile)); + Configuration conf = new Configuration(); + CompressionCodec codec = (CompressionCodec)ReflectionUtils.newInstance(SnappyCodec.class, conf); + FilterStreamCodec.SnappyFilterStream filterStream = new FilterStreamCodec.SnappyFilterStream( + codec.createOutputStream(os)); + + int ONE_MB = 1024 * 1024; + + String testStr = "TestSnap-16bytes"; + for (int i = 0; i < ONE_MB; i++) { // write 16 MBs + filterStream.write(testStr.getBytes()); + } + filterStream.flush(); + filterStream.close(); + + CompressionInputStream is = codec.createInputStream(new FileInputStream(snappyFile)); + + byte[] recovered = new byte[testStr.length()]; + int bytesRead = is.read(recovered); + is.close(); + assertEquals(testStr, new String(recovered)); + } + + @Test public void testRecoveryOfOpenFiles() { EvenOddHDFSExactlyOnceWriter writer = new EvenOddHDFSExactlyOnceWriter(); @@ -1753,6 +1838,61 @@ public class AbstractFileOutputOperatorTest Assert.assertEquals("Total", totalWindows, numWindows); } + private void checkSnappyFile(File file, List<Long> offsets, int startVal, int totalWindows, int totalRecords) throws IOException + { + FileInputStream fis; + InputStream gss = null; + Configuration conf = new Configuration(); + CompressionCodec codec = (CompressionCodec)ReflectionUtils.newInstance(SnappyCodec.class, conf); + CompressionInputStream snappyIs = null; + + BufferedReader br = null; + + int numWindows = 0; + try { + fis = new FileInputStream(file); + gss = fis; + + long startOffset = 0; + for (long offset : offsets) { + // Skip initial case in case file is not yet created + if (offset == 0) { + continue; + } + long limit = offset - startOffset; + LimitInputStream lis = new LimitInputStream(gss, limit); + + snappyIs = codec.createInputStream(lis); + br = new BufferedReader(new InputStreamReader(snappyIs)); + String eline = "" + (startVal + numWindows * 2); + int count = 0; + String line; + while ((line = br.readLine()) != null) { + Assert.assertEquals("File line", eline, line); + ++count; + if ((count % totalRecords) == 0) { + ++numWindows; + eline = "" + (startVal + numWindows * 2); + } + } + startOffset = offset; + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + if (br != null) { + br.close(); + } else { + if (snappyIs != null) { + snappyIs.close(); + } else if (gss != null) { + gss.close(); + } + } + } + Assert.assertEquals("Total", totalWindows, numWindows); + } + @Test public void testChainFilters() throws NoSuchAlgorithmException, IOException {
