Repository: tez
Updated Branches:
  refs/heads/master 573ac29bd -> 2358521fa


TEZ-3807. InMemoryWriter is not tested with RLE enabled. Contributed by 
Muhammad Samir Khan


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/2358521f
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/2358521f
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/2358521f

Branch: refs/heads/master
Commit: 2358521fab12f6e12f3fc1ff606b83408b413337
Parents: 573ac29
Author: Jason Lowe <[email protected]>
Authored: Tue Aug 1 15:31:11 2017 -0500
Committer: Jason Lowe <[email protected]>
Committed: Tue Aug 1 15:31:11 2017 -0500

----------------------------------------------------------------------
 .../shuffle/orderedgrouped/InMemoryWriter.java  |  9 +++--
 .../runtime/library/common/sort/impl/IFile.java |  6 ++--
 .../library/common/sort/impl/TestIFile.java     | 35 +++++++++++++-------
 3 files changed, 32 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/2358521f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryWriter.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryWriter.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryWriter.java
index 46dc72e..c5db7c9 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryWriter.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryWriter.java
@@ -48,9 +48,12 @@ public class InMemoryWriter extends Writer {
   }
 
   public InMemoryWriter(BoundedByteArrayOutputStream arrayStream) {
-    super(null, null);
-    this.out =
-      new NonSyncDataOutputStream(new IFileOutputStream(arrayStream));
+    this(arrayStream, false);
+  }
+
+  public InMemoryWriter(BoundedByteArrayOutputStream arrayStream, boolean rle) 
{
+    super(null, null, rle);
+    this.out = new NonSyncDataOutputStream(new IFileOutputStream(arrayStream));
   }
 
   public void append(Object key, Object value) throws IOException {

http://git-wip-us.apache.org/repos/asf/tez/blob/2358521f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
index b502fc9..e460859 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
@@ -127,11 +127,11 @@ public class IFile {
            writesCounter, serializedBytesCounter);
       ownOutputStream = true;
     }
-    
-    protected Writer(TezCounter writesCounter, TezCounter 
serializedBytesCounter) {
+
+    protected Writer(TezCounter writesCounter, TezCounter 
serializedBytesCounter, boolean rle) {
       writtenRecordsCounter = writesCounter;
       serializedUncompressedBytes = serializedBytesCounter;
-      this.rle = false;
+      this.rle = rle;
     }
 
     public Writer(Configuration conf, FSDataOutputStream outputStream,

http://git-wip-us.apache.org/repos/asf/tez/blob/2358521f/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
index f06fda3..90f5374 100644
--- 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
+++ 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
@@ -313,6 +313,9 @@ public class TestIFile {
     writer = new IFile.Writer(defaultConf, out,
         Text.class, IntWritable.class, codec, null, null, true);
 
+    BoundedByteArrayOutputStream boundedOut = new 
BoundedByteArrayOutputStream(1024*1024);
+    Writer inMemWriter = new InMemoryWriter(boundedOut, true);
+
     DataInputBuffer kin = new DataInputBuffer();
     kin.reset(kvbuffer, pos, keyLength);
 
@@ -324,6 +327,8 @@ public class TestIFile {
     //Write initial KV pair
     writer.append(kin, vin);
     assertFalse(writer.sameKey);
+    inMemWriter.append(kin, vin);
+    assertFalse(inMemWriter.sameKey);
     pos += (keyLength + valueLength);
 
     //Second key is similar to key1 (RLE should kick in)
@@ -332,6 +337,8 @@ public class TestIFile {
     vin.reset(vout.getData(), vout.getLength());
     writer.append(kin, vin);
     assertTrue(writer.sameKey);
+    inMemWriter.append(kin, vin);
+    assertTrue(inMemWriter.sameKey);
     pos += (keyLength + valueLength);
 
     //Next key (key3) is different (RLE should not kick in)
@@ -340,9 +347,13 @@ public class TestIFile {
     vin.reset(vout.getData(), vout.getLength());
     writer.append(kin, vin);
     assertFalse(writer.sameKey);
+    inMemWriter.append(kin, vin);
+    assertFalse(inMemWriter.sameKey);
 
     writer.close();
     out.close();
+    inMemWriter.close();
+    boundedOut.close();
   }
 
   @Test(timeout = 5000)
@@ -416,25 +427,25 @@ public class TestIFile {
 
     //No RLE, No RepeatKeys, no compression
     writer = new InMemoryWriter(bout);
-    writeTestFileUsingDataBuffer(writer, false, false, data, null);
+    writeTestFileUsingDataBuffer(writer, false, data);
     readUsingInMemoryReader(bout.getBuffer(), data);
 
     //No RLE, RepeatKeys, no compression
     bout.reset();
     writer = new InMemoryWriter(bout);
-    writeTestFileUsingDataBuffer(writer, false, true, data, null);
+    writeTestFileUsingDataBuffer(writer, true, data);
     readUsingInMemoryReader(bout.getBuffer(), data);
 
     //RLE, No RepeatKeys, no compression
     bout.reset();
-    writer = new InMemoryWriter(bout);
-    writeTestFileUsingDataBuffer(writer, true, false, data, null);
+    writer = new InMemoryWriter(bout, true);
+    writeTestFileUsingDataBuffer(writer, false, data);
     readUsingInMemoryReader(bout.getBuffer(), data);
 
     //RLE, RepeatKeys, no compression
     bout.reset();
-    writer = new InMemoryWriter(bout);
-    writeTestFileUsingDataBuffer(writer, true, true, data, null);
+    writer = new InMemoryWriter(bout, true);
+    writeTestFileUsingDataBuffer(writer, true, data);
     readUsingInMemoryReader(bout.getBuffer(), data);
   }
 
@@ -753,13 +764,13 @@ public class TestIFile {
     FSDataOutputStream out = localFs.create(outputPath);
     IFile.Writer writer = new IFile.Writer(defaultConf, out,
         Text.class, IntWritable.class, codec, null, null, rle);
-    writeTestFile(writer, rle, repeatKeys, data, codec);
+    writeTestFile(writer, repeatKeys, data);
     out.close();
     return  writer;
   }
 
-  private Writer writeTestFile(IFile.Writer writer, boolean rle, boolean 
repeatKeys,
-      List<KVPair> data, CompressionCodec codec) throws IOException {
+  private Writer writeTestFile(IFile.Writer writer, boolean repeatKeys,
+      List<KVPair> data) throws IOException {
     assertNotNull(writer);
 
     Text previousKey = null;
@@ -786,13 +797,13 @@ public class TestIFile {
     FSDataOutputStream out = localFs.create(outputPath);
     IFile.Writer writer = new IFile.Writer(defaultConf, out,
         Text.class, IntWritable.class, codec, null, null, rle);
-    writeTestFileUsingDataBuffer(writer, rle, repeatKeys, data, codec);
+    writeTestFileUsingDataBuffer(writer, repeatKeys, data);
     out.close();
     return writer;
   }
 
-  private Writer writeTestFileUsingDataBuffer(IFile.Writer writer, boolean 
rle, boolean repeatKeys,
-      List<KVPair> data, CompressionCodec codec) throws IOException {
+  private Writer writeTestFileUsingDataBuffer(Writer writer, boolean 
repeatKeys,
+      List<KVPair> data) throws IOException {
     DataInputBuffer previousKey = new DataInputBuffer();
     DataInputBuffer key = new DataInputBuffer();
     DataInputBuffer value = new DataInputBuffer();

Reply via email to