Repository: tez Updated Branches: refs/heads/master 4ae87f0e5 -> e36f962e7
TEZ-2363: Fix off-by-one error in REDUCE_INPUT_RECORDS counter (gopalv) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/e36f962e Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/e36f962e Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/e36f962e Branch: refs/heads/master Commit: e36f962e78f301974cd8eed2380a6a5bd26a49ae Parents: 4ae87f0 Author: Gopal V <[email protected]> Authored: Thu Apr 30 20:40:49 2015 +0530 Committer: Gopal V <[email protected]> Committed: Thu Apr 30 20:40:49 2015 +0530 ---------------------------------------------------------------------- .../runtime/library/common/ValuesIterator.java | 19 ++--- .../library/common/TestValuesIterator.java | 73 +++++++++++++++++++- 2 files changed, 81 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/e36f962e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java index 0f1bc5b..a1f52e7 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java @@ -95,14 +95,13 @@ public class ValuesIterator<KEY,VALUE> { readNextKey(); key = nextKey; nextKey = null; - hasMoreValues = more; isFirstRecord = false; } else { nextKey(); } return more; } - + /** The current key. */ public KEY getKey() { return key; @@ -162,11 +161,7 @@ public class ValuesIterator<KEY,VALUE> { while (hasMoreValues) { readNextKey(); } - if (more) { - inputKeyCounter.increment(1); - ++keyCtr; - } - + // move the next key to the current one KEY tmpKey = key; key = nextKey; @@ -185,8 +180,14 @@ public class ValuesIterator<KEY,VALUE> { keyIn.reset(nextKeyBytes.getData(), nextKeyBytes.getPosition(), nextKeyBytes.getLength() - nextKeyBytes.getPosition()); nextKey = keyDeserializer.deserialize(nextKey); - // TODO Is a counter increment required here ? - hasMoreValues = key != null && (comparator.compare(key, nextKey) == 0); + // hasMoreValues = is it first key or is key the same? + hasMoreValues = (key == null) || (comparator.compare(key, nextKey) == 0); + if (key == null || false == hasMoreValues) { + // invariant: more=true & there are no more values in an existing key group + // so this indicates start of new key group + inputKeyCounter.increment(1); + ++keyCtr; + } } else { hasMoreValues = in.isSameKey(); } http://git-wip-us.apache.org/repos/asf/tez/blob/e36f962e/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java index c483a81..e1718c8 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java @@ -64,6 +64,7 @@ import java.util.TreeMap; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -179,6 +180,32 @@ public class TestValuesIterator { } @Test(timeout = 20000) + public void testCountedIteratorWithInmemoryReader() throws IOException { + verifyCountedIteratorReader(true); + } + + @Test(timeout = 20000) + public void testCountedIteratorWithIFileReader() throws IOException { + verifyCountedIteratorReader(false); + } + + private void verifyCountedIteratorReader(boolean inMemory) throws IOException { + TezCounter keyCounter = new GenericCounter("inputKeyCounter", "y3"); + TezCounter tupleCounter = new GenericCounter("inputValuesCounter", "y4"); + ValuesIterator iterator = createCountedIterator(inMemory, keyCounter, + tupleCounter); + List<Integer> sequence = verifyIteratorData(iterator); + if (expectedTestResult) { + assertEquals((long) sequence.size(), keyCounter.getValue()); + long rows = 0; + for (Integer i : sequence) { + rows += i.longValue(); + } + assertEquals(rows, tupleCounter.getValue()); + } + } + + @Test(timeout = 20000) public void testIteratorWithIFileReaderEmptyPartitions() throws IOException { ValuesIterator iterator = createEmptyIterator(false); assert(iterator.moveToNext() == false); @@ -212,13 +239,19 @@ public class TestValuesIterator { /** * Tests whether data in valuesIterator matches with sorted input data set. - * + * + * Returns a list of value counts for each key. + * * @param valuesIterator + * @return List * @throws IOException */ - private void verifyIteratorData(ValuesIterator valuesIterator) throws IOException { + private List<Integer> verifyIteratorData( + ValuesIterator valuesIterator) throws IOException { boolean result = true; + ArrayList<Integer> sequence = new ArrayList<Integer>(); + //sort original data based on comparator ListMultimap<Writable, Writable> sortedMap = new ImmutableListMultimap.Builder<Writable, Writable>() @@ -240,6 +273,7 @@ public class TestValuesIterator { break; } + int valueCount = 0; //Verify values Iterator<Writable> vItr = valuesIterator.getValues().iterator(); for (Writable val : sortedMap.get(oriKey)) { @@ -250,13 +284,19 @@ public class TestValuesIterator { result = false; break; } + + valueCount++; } + sequence.add(valueCount); + assertTrue("At least 1 value per key", valueCount > 0); } if (expectedTestResult) { assertTrue(result); } else { assertFalse(result); } + + return sequence; } /** @@ -287,6 +327,35 @@ public class TestValuesIterator { (TezCounter) new GenericCounter("inputValueCounter", "y4")); } + /** + * Create sample data (in memory), with an attached counter and return ValuesIterator + * + * @param inMemory + * @param keyCounter + * @param tupleCounter + * @return ValuesIterator + * @throws IOException + */ + private ValuesIterator createCountedIterator(boolean inMemory, TezCounter keyCounter, TezCounter tupleCounter) throws IOException { + if (!inMemory) { + streamPaths = createFiles(); + //Merge all files to get KeyValueIterator + rawKeyValueIterator = + TezMerger.merge(conf, fs, keyClass, valClass, null, + false, -1, 1024, streamPaths, false, mergeFactor, tmpDir, comparator, + new ProgressReporter(), null, null, null, null); + } else { + List<TezMerger.Segment> segments = createInMemStreams(); + rawKeyValueIterator = + TezMerger.merge(conf, fs, keyClass, valClass, segments, mergeFactor, tmpDir, + comparator, new ProgressReporter(), new GenericCounter("readsCounter", "y"), + new GenericCounter("writesCounter", "y1"), + new GenericCounter("bytesReadCounter", "y2"), new Progress()); + } + return new ValuesIterator(rawKeyValueIterator, comparator, + keyClass, valClass, conf, keyCounter, tupleCounter); + } + @Parameterized.Parameters(name = "test[{0}, {1}, {2}, {3} {4} {5} {6}]") public static Collection<Object[]> getParameters() { Collection<Object[]> parameters = new ArrayList<Object[]>();
