Repository: apex-malhar
Updated Branches:
  refs/heads/master 8eb750053 -> c128091c7


APEXMALHAR-2487 Added support for Snappy compression in FilterStreamProvider, 
which in turn enables Snappy output.


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/3530fd93
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/3530fd93
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/3530fd93

Branch: refs/heads/master
Commit: 3530fd93a98cf027c0cc76060ead3d9cf7852cb6
Parents: 2fe2903
Author: Ilya Ganelin <[email protected]>
Authored: Fri Apr 28 21:54:06 2017 -0700
Committer: Ilya Ganelin <[email protected]>
Committed: Tue May 9 09:37:37 2017 -0700

----------------------------------------------------------------------
 .../lib/io/fs/FilterStreamCodec.java            |  83 +++++++++-
 .../io/fs/AbstractFileOutputOperatorTest.java   | 166 +++++++++++++++++--
 2 files changed, 235 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/3530fd93/library/src/main/java/com/datatorrent/lib/io/fs/FilterStreamCodec.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/io/fs/FilterStreamCodec.java 
b/library/src/main/java/com/datatorrent/lib/io/fs/FilterStreamCodec.java
index baf2297..cdef3b6 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/FilterStreamCodec.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/FilterStreamCodec.java
@@ -18,6 +18,7 @@
  */
 package com.datatorrent.lib.io.fs;
 
+import java.io.FilterOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.zip.GZIPOutputStream;
@@ -25,6 +26,11 @@ import java.util.zip.GZIPOutputStream;
 import javax.crypto.Cipher;
 import javax.crypto.CipherOutputStream;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.SnappyCodec;
+import org.apache.hadoop.io.compress.snappy.SnappyCompressor;
+
 /**
  * Filters for compression and encryption.
  *
@@ -99,7 +105,8 @@ public class FilterStreamCodec
     @Override
     public FilterStreamContext<CipherOutputStream> 
getFilterStreamContext(OutputStream outputStream) throws IOException
     {
-      return new 
FilterStreamContext.SimpleFilterStreamContext<CipherOutputStream>(new 
CipherOutputStream(outputStream, cipher));
+      return new 
FilterStreamContext.SimpleFilterStreamContext<CipherOutputStream>(
+          new CipherOutputStream(outputStream, cipher));
     }
 
     @Override
@@ -108,4 +115,78 @@ public class FilterStreamCodec
 
     }
   }
+
+  public static class SnappyFilterStream extends FilterOutputStream
+  {
+    /**
+     * Creates an output stream filter built on top of the specified
+     * underlying output stream.
+     *
+     * @param out the underlying output stream to be assigned to
+     *            the field <tt>this.out</tt> for later use, or
+     *            <code>null</code> if this instance is to be
+     *            created without an underlying stream.
+     */
+    public SnappyFilterStream(CompressionOutputStream out)
+    {
+      super(out);
+    }
+
+    public void finish() throws IOException
+    {
+      ((CompressionOutputStream)out).finish();
+    }
+  }
+
+  public static class SnappyFilterStreamContext extends 
FilterStreamContext.BaseFilterStreamContext<SnappyFilterStream>
+  {
+    private int bufferSize = 256 * 1024;
+
+    public void setBufferSize(int bufferSize)
+    {
+      this.bufferSize = bufferSize;
+    }
+
+    public SnappyFilterStreamContext(OutputStream outputStream) throws 
IOException
+    {
+      SnappyCodec codec = new SnappyCodec();
+      codec.setConf(new Configuration());
+      try {
+        filterStream = new SnappyFilterStream(
+            codec.createOutputStream(outputStream, new 
SnappyCompressor(bufferSize)));
+      } catch (IOException e) {
+        throw e;
+      }
+    }
+
+    @Override
+    public void finalizeContext() throws IOException
+    {
+      try {
+        filterStream.finish();
+      } catch (IOException e) {
+        throw e;
+      }
+    }
+  }
+
+  /**
+   * A provider for Snappy filter
+   */
+  public static class SnappyFilterStreamProvider implements 
FilterStreamProvider<SnappyFilterStream,
+      OutputStream>
+  {
+    @Override
+    public FilterStreamContext<SnappyFilterStream> 
getFilterStreamContext(OutputStream outputStream)
+        throws IOException
+    {
+      return new SnappyFilterStreamContext(outputStream);
+    }
+
+    @Override
+    public void 
reclaimFilterStreamContext(FilterStreamContext<SnappyFilterStream> 
filterStreamContext)
+    {
+
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/3530fd93/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
 
b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
index 472aa93..ceb1e7d 100644
--- 
a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
+++ 
b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
@@ -18,10 +18,12 @@
  */
 package com.datatorrent.lib.io.fs;
 
+import java.io.BufferedOutputStream;
 import java.io.BufferedReader;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
+import java.io.FileOutputStream;
 import java.io.FileWriter;
 import java.io.FilterOutputStream;
 import java.io.IOException;
@@ -55,6 +57,10 @@ import org.slf4j.LoggerFactory;
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.SnappyCodec;
+import org.apache.hadoop.util.ReflectionUtils;
 
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.io.Input;
@@ -74,6 +80,7 @@ import com.datatorrent.lib.util.TestUtils.TestInfo;
 import com.datatorrent.netlet.util.DTThrowable;
 
 import static 
com.datatorrent.lib.helper.OperatorContextTestHelper.mockOperatorContext;
+import static org.junit.Assert.assertEquals;
 
 public class AbstractFileOutputOperatorTest
 {
@@ -1585,20 +1592,9 @@ public class AbstractFileOutputOperatorTest
     Assert.assertEquals("Part file names", fileNames, getFileNames(files));
   }
 
-  @Test
-  public void testCompression() throws IOException
+  private void writeCompressedData(EvenOddHDFSExactlyOnceWriter writer, File 
evenFile,
+      File oddFile, List<Long> evenOffsets, List<Long> oddOffsets)
   {
-    EvenOddHDFSExactlyOnceWriter writer = new EvenOddHDFSExactlyOnceWriter();
-    writer.setFilterStreamProvider(new 
FilterStreamCodec.GZipFilterStreamProvider());
-
-    File evenFile = new File(testMeta.getDir(), EVEN_FILE);
-    File oddFile = new File(testMeta.getDir(), ODD_FILE);
-
-    // To get around the multi member gzip issue with openjdk
-    // http://bugs.java.com/bugdatabase/view_bug.do?bug_id=4691425
-    List<Long> evenOffsets = new ArrayList<Long>();
-    List<Long> oddOffsets = new ArrayList<Long>();
-
     writer.setFilePath(testMeta.getDir());
     writer.setAlwaysWriteToTmp(false);
     writer.setup(testMeta.testOperatorContext);
@@ -1617,12 +1613,101 @@ public class AbstractFileOutputOperatorTest
     }
 
     writer.teardown();
+  }
+
+
+  @Test
+  public void testGzipCompression() throws IOException
+  {
+    EvenOddHDFSExactlyOnceWriter writer = new EvenOddHDFSExactlyOnceWriter();
+    writer.setFilterStreamProvider(new 
FilterStreamCodec.GZipFilterStreamProvider());
+
+    File evenFile = new File(testMeta.getDir(), EVEN_FILE);
+    File oddFile = new File(testMeta.getDir(), ODD_FILE);
+
+    // To get around the multi member gzip issue with openjdk
+    // http://bugs.java.com/bugdatabase/view_bug.do?bug_id=4691425
+    List<Long> evenOffsets = new ArrayList<Long>();
+    List<Long> oddOffsets = new ArrayList<Long>();
+
+    writeCompressedData(writer, evenFile, oddFile, evenOffsets, oddOffsets);
 
     checkCompressedFile(evenFile, evenOffsets, 0, 5, 1000, null, null);
     checkCompressedFile(oddFile, oddOffsets, 1, 5, 1000, null, null);
   }
 
   @Test
+  public void testSnappyStreamProvider() throws IOException
+  {
+    if (checkNativeSnappy()) {
+      return;
+    }
+
+    EvenOddHDFSExactlyOnceWriter writer = new EvenOddHDFSExactlyOnceWriter();
+    writer.setFilterStreamProvider(new 
FilterStreamCodec.SnappyFilterStreamProvider());
+
+    File evenFile = new File(testMeta.getDir(), EVEN_FILE);
+    File oddFile = new File(testMeta.getDir(), ODD_FILE);
+
+    // To get around the multi member gzip issue with openjdk
+    // http://bugs.java.com/bugdatabase/view_bug.do?bug_id=4691425
+    List<Long> evenOffsets = new ArrayList<Long>();
+    List<Long> oddOffsets = new ArrayList<Long>();
+
+    writeCompressedData(writer, evenFile, oddFile, evenOffsets, oddOffsets);
+
+    checkSnappyFile(evenFile, evenOffsets, 0, 5, 1000);
+    checkSnappyFile(oddFile, oddOffsets, 1, 5, 1000);
+  }
+
+  private boolean checkNativeSnappy()
+  {
+    try {
+      SnappyCodec.checkNativeCodeLoaded();
+    } catch (UnsatisfiedLinkError u) {
+      LOG.error("WARNING: Skipping Snappy compression test since native 
libraries were not found.");
+      return true;
+    } catch (RuntimeException e) {
+      LOG.error("WARNING: Skipping Snappy compression test since native 
libraries were not found.");
+      return true;
+    }
+    return false;
+  }
+
+
+  @Test
+  public void testSnappyCompressionSimple() throws IOException
+  {
+    if (checkNativeSnappy()) {
+      return;
+    }
+
+    File snappyFile = new File(testMeta.getDir(), "snappyTestFile.snappy");
+
+    BufferedOutputStream os = new BufferedOutputStream(new 
FileOutputStream(snappyFile));
+    Configuration conf = new Configuration();
+    CompressionCodec codec = 
(CompressionCodec)ReflectionUtils.newInstance(SnappyCodec.class, conf);
+    FilterStreamCodec.SnappyFilterStream filterStream = new 
FilterStreamCodec.SnappyFilterStream(
+        codec.createOutputStream(os));
+
+    int ONE_MB = 1024 * 1024;
+
+    String testStr = "TestSnap-16bytes";
+    for (int i = 0; i < ONE_MB; i++) { // write 16 MBs
+      filterStream.write(testStr.getBytes());
+    }
+    filterStream.flush();
+    filterStream.close();
+
+    CompressionInputStream is = codec.createInputStream(new 
FileInputStream(snappyFile));
+
+    byte[] recovered = new byte[testStr.length()];
+    int bytesRead = is.read(recovered);
+    is.close();
+    assertEquals(testStr, new String(recovered));
+  }
+
+  @Test
   public void testRecoveryOfOpenFiles()
   {
     EvenOddHDFSExactlyOnceWriter writer = new EvenOddHDFSExactlyOnceWriter();
@@ -1753,6 +1838,61 @@ public class AbstractFileOutputOperatorTest
     Assert.assertEquals("Total", totalWindows, numWindows);
   }
 
+  private void checkSnappyFile(File file, List<Long> offsets, int startVal, 
int totalWindows, int totalRecords) throws IOException
+  {
+    FileInputStream fis;
+    InputStream gss = null;
+    Configuration conf = new Configuration();
+    CompressionCodec codec = 
(CompressionCodec)ReflectionUtils.newInstance(SnappyCodec.class, conf);
+    CompressionInputStream snappyIs = null;
+
+    BufferedReader br = null;
+
+    int numWindows = 0;
+    try {
+      fis = new FileInputStream(file);
+      gss = fis;
+
+      long startOffset = 0;
+      for (long offset : offsets) {
+        // Skip initial case in case file is not yet created
+        if (offset == 0) {
+          continue;
+        }
+        long limit = offset - startOffset;
+        LimitInputStream lis = new LimitInputStream(gss, limit);
+
+        snappyIs = codec.createInputStream(lis);
+        br = new BufferedReader(new InputStreamReader(snappyIs));
+        String eline = "" + (startVal + numWindows * 2);
+        int count = 0;
+        String line;
+        while ((line = br.readLine()) != null) {
+          Assert.assertEquals("File line", eline, line);
+          ++count;
+          if ((count % totalRecords) == 0) {
+            ++numWindows;
+            eline = "" + (startVal + numWindows * 2);
+          }
+        }
+        startOffset = offset;
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+    } finally {
+      if (br != null) {
+        br.close();
+      } else {
+        if (snappyIs != null) {
+          snappyIs.close();
+        } else if (gss != null) {
+          gss.close();
+        }
+      }
+    }
+    Assert.assertEquals("Total", totalWindows, numWindows);
+  }
+
   @Test
   public void testChainFilters() throws NoSuchAlgorithmException, IOException
   {

Reply via email to