Repository: tez Updated Branches: refs/heads/master 573ac29bd -> 2358521fa
TEZ-3807. InMemoryWriter is not tested with RLE enabled. Contributed by Muhammad Samir Khan Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/2358521f Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/2358521f Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/2358521f Branch: refs/heads/master Commit: 2358521fab12f6e12f3fc1ff606b83408b413337 Parents: 573ac29 Author: Jason Lowe <[email protected]> Authored: Tue Aug 1 15:31:11 2017 -0500 Committer: Jason Lowe <[email protected]> Committed: Tue Aug 1 15:31:11 2017 -0500 ---------------------------------------------------------------------- .../shuffle/orderedgrouped/InMemoryWriter.java | 9 +++-- .../runtime/library/common/sort/impl/IFile.java | 6 ++-- .../library/common/sort/impl/TestIFile.java | 35 +++++++++++++------- 3 files changed, 32 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/2358521f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryWriter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryWriter.java index 46dc72e..c5db7c9 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryWriter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryWriter.java @@ -48,9 +48,12 @@ public class InMemoryWriter extends Writer { } public InMemoryWriter(BoundedByteArrayOutputStream arrayStream) { - super(null, null); - this.out = - new NonSyncDataOutputStream(new IFileOutputStream(arrayStream)); + this(arrayStream, false); + } + + public InMemoryWriter(BoundedByteArrayOutputStream arrayStream, boolean rle) { + super(null, null, rle); + this.out = new NonSyncDataOutputStream(new IFileOutputStream(arrayStream)); } public void append(Object key, Object value) throws IOException { http://git-wip-us.apache.org/repos/asf/tez/blob/2358521f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java index b502fc9..e460859 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java @@ -127,11 +127,11 @@ public class IFile { writesCounter, serializedBytesCounter); ownOutputStream = true; } - - protected Writer(TezCounter writesCounter, TezCounter serializedBytesCounter) { + + protected Writer(TezCounter writesCounter, TezCounter serializedBytesCounter, boolean rle) { writtenRecordsCounter = writesCounter; serializedUncompressedBytes = serializedBytesCounter; - this.rle = false; + this.rle = rle; } public Writer(Configuration conf, FSDataOutputStream outputStream, http://git-wip-us.apache.org/repos/asf/tez/blob/2358521f/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java index f06fda3..90f5374 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java @@ -313,6 +313,9 @@ public class TestIFile { writer = new IFile.Writer(defaultConf, out, Text.class, IntWritable.class, codec, null, null, true); + BoundedByteArrayOutputStream boundedOut = new BoundedByteArrayOutputStream(1024*1024); + Writer inMemWriter = new InMemoryWriter(boundedOut, true); + DataInputBuffer kin = new DataInputBuffer(); kin.reset(kvbuffer, pos, keyLength); @@ -324,6 +327,8 @@ public class TestIFile { //Write initial KV pair writer.append(kin, vin); assertFalse(writer.sameKey); + inMemWriter.append(kin, vin); + assertFalse(inMemWriter.sameKey); pos += (keyLength + valueLength); //Second key is similar to key1 (RLE should kick in) @@ -332,6 +337,8 @@ public class TestIFile { vin.reset(vout.getData(), vout.getLength()); writer.append(kin, vin); assertTrue(writer.sameKey); + inMemWriter.append(kin, vin); + assertTrue(inMemWriter.sameKey); pos += (keyLength + valueLength); //Next key (key3) is different (RLE should not kick in) @@ -340,9 +347,13 @@ public class TestIFile { vin.reset(vout.getData(), vout.getLength()); writer.append(kin, vin); assertFalse(writer.sameKey); + inMemWriter.append(kin, vin); + assertFalse(inMemWriter.sameKey); writer.close(); out.close(); + inMemWriter.close(); + boundedOut.close(); } @Test(timeout = 5000) @@ -416,25 +427,25 @@ public class TestIFile { //No RLE, No RepeatKeys, no compression writer = new InMemoryWriter(bout); - writeTestFileUsingDataBuffer(writer, false, false, data, null); + writeTestFileUsingDataBuffer(writer, false, data); readUsingInMemoryReader(bout.getBuffer(), data); //No RLE, RepeatKeys, no compression bout.reset(); writer = new InMemoryWriter(bout); - writeTestFileUsingDataBuffer(writer, false, true, data, null); + writeTestFileUsingDataBuffer(writer, true, data); readUsingInMemoryReader(bout.getBuffer(), data); //RLE, No RepeatKeys, no compression bout.reset(); - writer = new InMemoryWriter(bout); - writeTestFileUsingDataBuffer(writer, true, false, data, null); + writer = new InMemoryWriter(bout, true); + writeTestFileUsingDataBuffer(writer, false, data); readUsingInMemoryReader(bout.getBuffer(), data); //RLE, RepeatKeys, no compression bout.reset(); - writer = new InMemoryWriter(bout); - writeTestFileUsingDataBuffer(writer, true, true, data, null); + writer = new InMemoryWriter(bout, true); + writeTestFileUsingDataBuffer(writer, true, data); readUsingInMemoryReader(bout.getBuffer(), data); } @@ -753,13 +764,13 @@ public class TestIFile { FSDataOutputStream out = localFs.create(outputPath); IFile.Writer writer = new IFile.Writer(defaultConf, out, Text.class, IntWritable.class, codec, null, null, rle); - writeTestFile(writer, rle, repeatKeys, data, codec); + writeTestFile(writer, repeatKeys, data); out.close(); return writer; } - private Writer writeTestFile(IFile.Writer writer, boolean rle, boolean repeatKeys, - List<KVPair> data, CompressionCodec codec) throws IOException { + private Writer writeTestFile(IFile.Writer writer, boolean repeatKeys, + List<KVPair> data) throws IOException { assertNotNull(writer); Text previousKey = null; @@ -786,13 +797,13 @@ public class TestIFile { FSDataOutputStream out = localFs.create(outputPath); IFile.Writer writer = new IFile.Writer(defaultConf, out, Text.class, IntWritable.class, codec, null, null, rle); - writeTestFileUsingDataBuffer(writer, rle, repeatKeys, data, codec); + writeTestFileUsingDataBuffer(writer, repeatKeys, data); out.close(); return writer; } - private Writer writeTestFileUsingDataBuffer(IFile.Writer writer, boolean rle, boolean repeatKeys, - List<KVPair> data, CompressionCodec codec) throws IOException { + private Writer writeTestFileUsingDataBuffer(Writer writer, boolean repeatKeys, + List<KVPair> data) throws IOException { DataInputBuffer previousKey = new DataInputBuffer(); DataInputBuffer key = new DataInputBuffer(); DataInputBuffer value = new DataInputBuffer();
