Repository: tez Updated Branches: refs/heads/master 210619a56 -> b0054628d
TEZ-2392. Have all readers throw an Exception on incorrect next() usage (rbalamohan) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/b0054628 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/b0054628 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/b0054628 Branch: refs/heads/master Commit: b0054628df8d6d01cfed9bf850759ebc39c1e3b7 Parents: 210619a Author: Rajesh Balamohan <[email protected]> Authored: Tue May 5 11:02:07 2015 +0530 Committer: Rajesh Balamohan <[email protected]> Committed: Tue May 5 11:02:07 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/tez/mapreduce/input/MRInput.java | 3 +- .../tez/mapreduce/lib/MRReaderMapReduce.java | 5 +- .../tez/mapreduce/lib/MRReaderMapred.java | 3 + .../tez/mapreduce/input/TestMultiMRInput.java | 13 ++ .../tez/mapreduce/lib/TestKVReadersWithMR.java | 178 +++++++++++++++++++ .../tez/runtime/library/api/KeyValueReader.java | 17 ++ .../runtime/library/api/KeyValuesReader.java | 17 ++ .../runtime/library/common/ValuesIterator.java | 18 ++ .../common/readers/UnorderedKVReader.java | 10 +- .../input/ConcatenatedMergedKeyValueInput.java | 4 +- .../input/ConcatenatedMergedKeyValuesInput.java | 4 +- .../library/input/OrderedGroupedKVInput.java | 2 + .../input/OrderedGroupedMergedKVInput.java | 3 + .../runtime/library/input/UnorderedKVInput.java | 2 + .../library/common/TestValuesIterator.java | 21 ++- .../common/readers/TestUnorderedKVReader.java | 168 +++++++++++++++++ .../input/TestSortedGroupedMergedInput.java | 143 ++++++++++++++- 18 files changed, 595 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/b0054628/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 2ff7601..816c7a5 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES TEZ-1993. Implement a pluggable InputSizeEstimator for grouping fairly ALL CHANGES: + TEZ-2392. Have all readers throw an Exception on incorrect next() usage. TEZ-2408. TestTaskAttempt fails to compile against hadoop-2.4 and hadoop-2.2. TEZ-2405. PipelinedSorter can throw NPE with custom compartor. TEZ-1897. Create a concurrent version of AsyncDispatcher http://git-wip-us.apache.org/repos/asf/tez/blob/b0054628/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java index 991f6d1..270f68f 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java @@ -493,7 +493,8 @@ public class MRInput extends MRInputBase { /** * Returns a {@link KeyValueReader} that can be used to read - * Map Reduce compatible key value data + * Map Reduce compatible key value data. An exception will be thrown if next() + * is invoked after false, either from the framework or from the underlying InputFormat */ @Override public KeyValueReader getReader() throws IOException { http://git-wip-us.apache.org/repos/asf/tez/blob/b0054628/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapReduce.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapReduce.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapReduce.java index 39cd79c..0495751 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapReduce.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapReduce.java @@ -44,7 +44,7 @@ public class MRReaderMapReduce extends MRReader { @SuppressWarnings("rawtypes") private final InputFormat inputFormat; @SuppressWarnings("rawtypes") - private RecordReader recordReader; + protected RecordReader recordReader; private InputSplit inputSplit; private boolean setupComplete = false; @@ -120,6 +120,9 @@ public class MRReaderMapReduce extends MRReader { } if (hasNext) { inputRecordCounter.increment(1); + } else { + hasCompletedProcessing(); + completedProcessing = true; } return hasNext; } http://git-wip-us.apache.org/repos/asf/tez/blob/b0054628/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapred.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapred.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapred.java index c4ad7a4..366e7a7 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapred.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapred.java @@ -113,6 +113,9 @@ public class MRReaderMapred extends MRReader { boolean hasNext = recordReader.next(key, value); if (hasNext) { inputRecordCounter.increment(1); + } else { + hasCompletedProcessing(); + completedProcessing = true; } return hasNext; } http://git-wip-us.apache.org/repos/asf/tez/blob/b0054628/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java index 55f6bff..4031140 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java @@ -131,6 +131,12 @@ public class TestMultiMRInput { Object val = reader.getCurrentValue(); assertEquals(val, data1.remove(key)); } + try { + boolean hasNext = reader.next(); //should throw exception + fail(); + } catch(IOException e) { + assertTrue(e.getMessage().contains("For usage, please refer to")); + } } assertEquals(1, readerCount); } @@ -198,6 +204,13 @@ public class TestMultiMRInput { Object val = reader.getCurrentValue(); assertEquals(val, data.remove(key)); } + + try { + boolean hasNext = reader.next(); //should throw exception + fail(); + } catch(IOException e) { + assertTrue(e.getMessage().contains("For usage, please refer to")); + } } assertEquals(2, readerCount); } http://git-wip-us.apache.org/repos/asf/tez/blob/b0054628/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/lib/TestKVReadersWithMR.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/lib/TestKVReadersWithMR.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/lib/TestKVReadersWithMR.java new file mode 100644 index 0000000..65f5ad0 --- /dev/null +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/lib/TestKVReadersWithMR.java @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.mapreduce.lib; + +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.tez.common.counters.TaskCounter; +import org.apache.tez.common.counters.TezCounter; +import org.apache.tez.common.counters.TezCounters; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class TestKVReadersWithMR { + + private JobConf conf; + private TezCounters counters; + private TezCounter inputRecordCounter; + + @Before + public void setup() { + conf = new JobConf(); + counters = new TezCounters(); + inputRecordCounter = counters.findCounter(TaskCounter.INPUT_RECORDS_PROCESSED); + } + + @Test(timeout = 10000) + public void testMRReaderMapred() throws IOException { + //empty + testWithSpecificNumberOfKV(0); + + testWithSpecificNumberOfKV(10); + + //empty + testWithSpecificNumberOfKV_MapReduce(0); + + testWithSpecificNumberOfKV_MapReduce(10); + } + + public void testWithSpecificNumberOfKV(int kvPairs) throws IOException { + MRReaderMapred reader = new MRReaderMapred(conf, counters, inputRecordCounter); + + reader.recordReader = new DummyRecordReader(kvPairs); + int records = 0; + while (reader.next()) { + records++; + } + assertTrue(kvPairs == records); + + //reading again should fail + try { + boolean hasNext = reader.next(); + fail(); + } catch (IOException e) { + assertTrue(e.getMessage().contains("For usage, please refer to")); + } + + } + + public void testWithSpecificNumberOfKV_MapReduce(int kvPairs) throws IOException { + MRReaderMapReduce reader = new MRReaderMapReduce(conf, counters, inputRecordCounter, -1, 1, + 10, 20, 30); + + reader.recordReader = new DummyRecordReaderMapReduce(kvPairs); + int records = 0; + while (reader.next()) { + records++; + } + assertTrue(kvPairs == records); + + //reading again should fail + try { + boolean hasNext = reader.next(); + fail(); + } catch (IOException e) { + assertTrue(e.getMessage().contains("For usage, please refer to")); + } + } + + static class DummyRecordReader implements RecordReader { + int records; + + public DummyRecordReader(int records) { + this.records = records; + } + + @Override + public boolean next(Object o, Object o2) throws IOException { + return (records-- > 0); + } + + @Override + public Object createKey() { + return null; + } + + @Override + public Object createValue() { + return null; + } + + @Override + public long getPos() throws IOException { + return 0; + } + + @Override + public void close() throws IOException { + + } + + @Override + public float getProgress() throws IOException { + return 0; + } + } + + static class DummyRecordReaderMapReduce extends org.apache.hadoop.mapreduce.RecordReader { + int records; + + public DummyRecordReaderMapReduce(int records) { + this.records = records; + } + + @Override + public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) + throws IOException, InterruptedException { + + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + return (records-- > 0); + } + + @Override + public Object getCurrentKey() throws IOException, InterruptedException { + return null; + } + + @Override + public Object getCurrentValue() throws IOException, InterruptedException { + return null; + } + + @Override + public float getProgress() throws IOException, InterruptedException { + return 0; + } + + @Override + public void close() throws IOException { + } + } + +} http://git-wip-us.apache.org/repos/asf/tez/blob/b0054628/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueReader.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueReader.java index 67b6f85..d504d08 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueReader.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueReader.java @@ -34,11 +34,15 @@ import org.apache.tez.runtime.api.Reader; * Object value = kvReader.getCurrentValue(); * </code> * + * if next() is called after processing everything, + * IOException would be thrown */ @Public @Evolving public abstract class KeyValueReader extends Reader { + protected boolean completedProcessing; + /** * Moves to the next key/values(s) pair * @@ -62,4 +66,17 @@ public abstract class KeyValueReader extends Reader { * @throws IOException */ public abstract Object getCurrentValue() throws IOException; + + /** + * Check whether processing has been completed. + * + * @throws IOException + */ + protected void hasCompletedProcessing() throws IOException { + if (completedProcessing) { + throw new IOException("Please check if you are" + + " invoking next() even after it returned false. For usage, please refer to " + + "KeyValueReader javadocs"); + } + } } http://git-wip-us.apache.org/repos/asf/tez/blob/b0054628/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValuesReader.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValuesReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValuesReader.java index 0bb2777..510f4b7 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValuesReader.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValuesReader.java @@ -34,11 +34,15 @@ import org.apache.tez.runtime.api.Reader; * Iterable<Object> values = kvReader.getCurrentValues(); * </code> * + * if next() is called after processing everything, + * IOException would be thrown */ @Public @Evolving public abstract class KeyValuesReader extends Reader { + protected boolean completedProcessing; + /** * Moves to the next key/values(s) pair * @@ -60,4 +64,17 @@ public abstract class KeyValuesReader extends Reader { * @return an Iterable view of the values associated with the current key */ public abstract Iterable<Object> getCurrentValues() throws IOException; + + /** + * Check whether processing has been completed. + * + * @throws IOException + */ + protected void hasCompletedProcessing() throws IOException { + if (completedProcessing) { + throw new IOException("Please check if you are" + + " invoking next() even after it returned false. For usage, please refer to " + + "KeyValuesReader javadocs"); + } + } } http://git-wip-us.apache.org/repos/asf/tez/blob/b0054628/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java index a1f52e7..24f9f8a 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java @@ -63,6 +63,8 @@ public class ValuesIterator<KEY,VALUE> { private int keyCtr = 0; private boolean hasMoreValues; // For the current key. private boolean isFirstRecord = true; + + private boolean completedProcessing; public ValuesIterator (TezRawKeyValueIterator in, RawComparator<KEY> comparator, @@ -99,6 +101,10 @@ public class ValuesIterator<KEY,VALUE> { } else { nextKey(); } + if (!more) { + hasCompletedProcessing(); + completedProcessing = true; + } return more; } @@ -206,4 +212,16 @@ public class ValuesIterator<KEY,VALUE> { nextValueBytes.getLength() - nextValueBytes.getPosition()); value = valDeserializer.deserialize(value); } + + /** + * Check whether processing has been completed. + * + * @throws IOException + */ + protected void hasCompletedProcessing() throws IOException { + if (completedProcessing) { + throw new IOException("Please check if you are invoking moveToNext() even after it returned" + + " false."); + } + } } http://git-wip-us.apache.org/repos/asf/tez/blob/b0054628/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java index 46af66d..b14a461 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java @@ -71,8 +71,7 @@ public class UnorderedKVReader<K, V> extends KeyValueReader { // the counter at the moment will generate aggregate numbers. private int numRecordsRead = 0; - private boolean completedProcessing; - + public UnorderedKVReader(ShuffleManager shuffleManager, Configuration conf, CompressionCodec codec, boolean ifileReadAhead, int ifileReadAheadLength, int ifileBufferSize, TezCounter inputRecordCounter) @@ -131,13 +130,6 @@ public class UnorderedKVReader<K, V> extends KeyValueReader { } } - private void hasCompletedProcessing() throws IOException { - if (completedProcessing) { - throw new IOException("Reader has already processed all the inputs. Please check if you are" - + " invoking next() even after it returned false. For usage, please refer to " - + "KeyValueReader javadocs"); - } - } @Override public Object getCurrentKey() throws IOException { http://git-wip-us.apache.org/repos/asf/tez/blob/b0054628/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java index 39e0fff..14b1e2c 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java @@ -46,11 +46,13 @@ public class ConcatenatedMergedKeyValueInput extends MergedLogicalInput { public class ConcatenatedMergedKeyValueReader extends KeyValueReader { private int currentReaderIndex = 0; private KeyValueReader currentReader; - + @Override public boolean next() throws IOException { while ((currentReader == null) || !currentReader.next()) { if (currentReaderIndex == getInputs().size()) { + hasCompletedProcessing(); + completedProcessing = true; return false; } try { http://git-wip-us.apache.org/repos/asf/tez/blob/b0054628/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java index 0cc3244..2a1e4c6 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java @@ -47,11 +47,13 @@ public class ConcatenatedMergedKeyValuesInput extends MergedLogicalInput { public class ConcatenatedMergedKeyValuesReader extends KeyValuesReader { private int currentReaderIndex = 0; private KeyValuesReader currentReader; - + @Override public boolean next() throws IOException { while ((currentReader == null) || !currentReader.next()) { if (currentReaderIndex == getInputs().size()) { + hasCompletedProcessing(); + completedProcessing = true; return false; } try { http://git-wip-us.apache.org/repos/asf/tez/blob/b0054628/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java index e61dbdc..d784fcd 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java @@ -218,6 +218,8 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput { return new KeyValuesReader() { @Override public boolean next() throws IOException { + hasCompletedProcessing(); + completedProcessing = true; return false; } http://git-wip-us.apache.org/repos/asf/tez/blob/b0054628/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedMergedKVInput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedMergedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedMergedKVInput.java index 9adac54..41ca7c9 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedMergedKVInput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedMergedKVInput.java @@ -126,6 +126,9 @@ public class OrderedGroupedMergedKVInput extends MergedLogicalInput { currentKey = nextKVReader.getCurrentKey(); currentValues.moveToNext(); return true; + } else { + hasCompletedProcessing(); + completedProcessing = true; } return false; } http://git-wip-us.apache.org/repos/asf/tez/blob/b0054628/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java index ce27103..62fa9a5 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java @@ -164,6 +164,8 @@ public class UnorderedKVInput extends AbstractLogicalInput { return new KeyValueReader() { @Override public boolean next() throws IOException { + hasCompletedProcessing(); + completedProcessing = true; return false; } http://git-wip-us.apache.org/repos/asf/tez/blob/b0054628/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java index e1718c8..edb9b15 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java @@ -65,6 +65,7 @@ import java.util.TreeMap; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -208,10 +209,19 @@ public class TestValuesIterator { @Test(timeout = 20000) public void testIteratorWithIFileReaderEmptyPartitions() throws IOException { ValuesIterator iterator = createEmptyIterator(false); - assert(iterator.moveToNext() == false); + assertTrue(iterator.moveToNext() == false); iterator = createEmptyIterator(true); - assert(iterator.moveToNext() == false); + assertTrue(iterator.moveToNext() == false); + } + + private void getNextFromFinishedIterator(ValuesIterator iterator) { + try { + boolean hasNext = iterator.moveToNext(); + fail(); + } catch(IOException e) { + assertTrue(e.getMessage().contains("Please check if you are invoking moveToNext()")); + } } private ValuesIterator createEmptyIterator(boolean inMemory) throws IOException { @@ -292,7 +302,14 @@ public class TestValuesIterator { } if (expectedTestResult) { assertTrue(result); + + assertFalse(valuesIterator.moveToNext()); + getNextFromFinishedIterator(valuesIterator); } else { + while(valuesIterator.moveToNext()) { + //iterate through all keys + } + getNextFromFinishedIterator(valuesIterator); assertFalse(result); } http://git-wip-us.apache.org/repos/asf/tez/blob/b0054628/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/readers/TestUnorderedKVReader.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/readers/TestUnorderedKVReader.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/readers/TestUnorderedKVReader.java new file mode 100644 index 0000000..51ea42d --- /dev/null +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/readers/TestUnorderedKVReader.java @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.runtime.library.common.readers; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.tez.common.counters.TaskCounter; +import org.apache.tez.common.counters.TezCounter; +import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; +import org.apache.tez.runtime.library.common.InputAttemptIdentifier; +import org.apache.tez.runtime.library.common.shuffle.FetchedInput; +import org.apache.tez.runtime.library.common.shuffle.FetchedInputCallback; +import org.apache.tez.runtime.library.common.shuffle.LocalDiskFetchedInput; +import org.apache.tez.runtime.library.common.shuffle.impl.ShuffleManager; +import org.apache.tez.runtime.library.common.sort.impl.IFile; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.LinkedList; + +import static junit.framework.TestCase.fail; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; + +public class TestUnorderedKVReader { + + private static final Logger LOG = LoggerFactory.getLogger(TestUnorderedKVReader.class); + + private static Configuration defaultConf = new Configuration(); + private static FileSystem localFs = null; + private static Path workDir = null; + + private String outputFileName = "ifile.out"; + private Path outputPath; + private long rawLen; + private long compLen; + + private UnorderedKVReader<Text, Text> unorderedKVReader; + + static { + defaultConf.set("fs.defaultFS", "file:///"); + try { + localFs = FileSystem.getLocal(defaultConf); + workDir = new Path( + new Path(System.getProperty("test.build.data", "/tmp")), + TestUnorderedKVReader.class.getName()) + .makeQualified(localFs.getUri(), localFs.getWorkingDirectory()); + LOG.info("Using workDir: " + workDir); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Before + public void setUp() throws Exception { + outputPath = new Path(workDir, outputFileName); + setupReader(); + } + + private void setupReader() throws IOException, InterruptedException { + defaultConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class.getName()); + defaultConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, Text.class.getName()); + + createIFile(outputPath, 1); + + final LinkedList<LocalDiskFetchedInput> inputs = new LinkedList<LocalDiskFetchedInput>(); + LocalDiskFetchedInput realFetchedInput = new LocalDiskFetchedInput(0, rawLen, compLen, new + InputAttemptIdentifier(0, 0), outputPath, defaultConf, new FetchedInputCallback() { + @Override + public void fetchComplete(FetchedInput fetchedInput) { + } + + @Override + public void fetchFailed(FetchedInput fetchedInput) { + } + + @Override + public void freeResources(FetchedInput fetchedInput) { + } + }); + LocalDiskFetchedInput fetchedInput = spy(realFetchedInput); + doNothing().when(fetchedInput).free(); + + inputs.add(fetchedInput); + + TezCounters counters = new TezCounters(); + TezCounter inputRecords = counters.findCounter(TaskCounter.INPUT_RECORDS_PROCESSED); + + ShuffleManager manager = mock(ShuffleManager.class); + doAnswer(new Answer() { + @Override public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + return (inputs.isEmpty()) ? null : inputs.remove(); + } + }).when(manager).getNextInput(); + + unorderedKVReader = new UnorderedKVReader<Text, Text>(manager, + defaultConf, null, false, -1, -1, inputRecords); + } + + private void createIFile(Path path, int recordCount) throws IOException { + FSDataOutputStream out = localFs.create(path); + IFile.Writer writer = + new IFile.Writer(defaultConf, out, Text.class, Text.class, null, null, null, true); + + for (int i = 0; i < recordCount; i++) { + writer.append(new Text("Key_" + i), new Text("Value_" + i)); + } + writer.close(); + rawLen = writer.getRawLength(); + compLen = writer.getCompressedLength(); + out.close(); + } + + @Before + @After + public void cleanup() throws Exception { + localFs.delete(workDir, true); + } + + @Test(timeout = 5000) + public void testReadingMultipleTimes() throws Exception { + int counter = 0; + while (unorderedKVReader.next()) { + unorderedKVReader.getCurrentKey(); + unorderedKVReader.getCurrentKey(); + counter++; + } + Assert.assertEquals(1, counter); + + //Check the reader again. This shouldn't throw EOF exception in IFile + try { + boolean next = unorderedKVReader.next(); + fail(); + } catch(IOException ioe) { + Assert.assertTrue(ioe.getMessage().contains("For usage, please refer to")); + } + } + +} http://git-wip-us.apache.org/repos/asf/tez/blob/b0054628/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/input/TestSortedGroupedMergedInput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/input/TestSortedGroupedMergedInput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/input/TestSortedGroupedMergedInput.java index 570deb7..0de400e 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/input/TestSortedGroupedMergedInput.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/input/TestSortedGroupedMergedInput.java @@ -19,7 +19,7 @@ package org.apache.tez.runtime.library.input; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; @@ -35,7 +35,9 @@ import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.Input; import org.apache.tez.runtime.api.MergedLogicalInput; import org.apache.tez.runtime.api.MergedInputContext; +import org.apache.tez.runtime.api.Reader; import org.apache.tez.runtime.api.impl.TezMergedInputContextImpl; +import org.apache.tez.runtime.library.api.KeyValueReader; import org.apache.tez.runtime.library.api.KeyValuesReader; import org.junit.Test; @@ -82,6 +84,18 @@ public class TestSortedGroupedMergedInput { } assertEquals(6, valCount); } + + getNextFromFinishedReader(kvsReader); + } + + private void getNextFromFinishedReader(KeyValuesReader kvsReader) { + //Try reading again and it should throw IOException + try { + boolean hasNext = kvsReader.next(); + fail(); + } catch(IOException e) { + assertTrue(e.getMessage().contains("For usage, please refer to")); + } } @Test(timeout = 5000) @@ -126,6 +140,7 @@ public class TestSortedGroupedMergedInput { } assertEquals(6, valCount); } + getNextFromFinishedReader(kvsReader); } @Test(timeout = 5000) @@ -172,6 +187,7 @@ public class TestSortedGroupedMergedInput { assertEquals(6, valCount); } } + getNextFromFinishedReader(kvsReader); } @Test(timeout = 5000) @@ -223,6 +239,7 @@ public class TestSortedGroupedMergedInput { fail("Unexpected key"); } } + getNextFromFinishedReader(kvsReader); } @Test(timeout = 5000) @@ -277,6 +294,7 @@ public class TestSortedGroupedMergedInput { fail("Unexpected key"); } } + getNextFromFinishedReader(kvsReader); } // Reads all values for a key, but doesn't trigger the last hasNext() call. @@ -324,6 +342,7 @@ public class TestSortedGroupedMergedInput { } assertEquals(6, valCount); } + getNextFromFinishedReader(kvsReader); } @Test(timeout = 5000) @@ -350,7 +369,84 @@ public class TestSortedGroupedMergedInput { OrderedGroupedMergedKVInput input = new OrderedGroupedMergedKVInput(createMergedInputContext(), sInputs); KeyValuesReader kvsReader = input.getReader(); - assertFalse(kvsReader.next()); + assertTrue(kvsReader.next() == false); + getNextFromFinishedReader(kvsReader); + } + + @Test(timeout = 5000) + public void testSimpleConcatenatedMergedKeyValueInput() throws Exception { + + DummyInput sInput1 = new DummyInput(10); + DummyInput sInput2 = new DummyInput(10); + DummyInput sInput3 = new DummyInput(10); + + List<Input> sInputs = new LinkedList<Input>(); + sInputs.add(sInput1); + sInputs.add(sInput2); + sInputs.add(sInput3); + ConcatenatedMergedKeyValueInput input = + new ConcatenatedMergedKeyValueInput(createMergedInputContext(), sInputs); + + KeyValueReader kvReader = input.getReader(); + int keyCount = 0; + while (kvReader.next()) { + keyCount++; + Integer key = (Integer) kvReader.getCurrentKey(); + Integer value = (Integer) kvReader.getCurrentValue(); + } + assertTrue(keyCount == 30); + + getNextFromFinishedReader(kvReader); + } + + @Test(timeout = 5000) + public void testSimpleConcatenatedMergedKeyValuesInput() throws Exception { + SortedTestKeyValuesReader kvsReader1 = new SortedTestKeyValuesReader(new int[] { 1, 2, 3 }, + new int[][] { { 1, 1 }, { 2, 2 }, { 3, 3 } }); + + SortedTestKeyValuesReader kvsReader2 = new SortedTestKeyValuesReader(new int[] { 1, 2, 3 }, + new int[][] { { 1, 1 }, { 2, 2 }, { 3, 3 } }); + + SortedTestKeyValuesReader kvsReader3 = new SortedTestKeyValuesReader(new int[] { 1, 2, 3 }, + new int[][] { { 1, 1 }, { 2, 2 }, { 3, 3 } }); + + SortedTestInput sInput1 = new SortedTestInput(kvsReader1); + SortedTestInput sInput2 = new SortedTestInput(kvsReader2); + SortedTestInput sInput3 = new SortedTestInput(kvsReader3); + + List<Input> sInputs = new LinkedList<Input>(); + sInputs.add(sInput1); + sInputs.add(sInput2); + sInputs.add(sInput3); + ConcatenatedMergedKeyValuesInput input = + new ConcatenatedMergedKeyValuesInput(createMergedInputContext(), sInputs); + + KeyValuesReader kvsReader = input.getReader(); + int keyCount = 0; + while (kvsReader.next()) { + keyCount++; + Integer key = (Integer) kvsReader.getCurrentKey(); + Iterator<Object> valuesIter = kvsReader.getCurrentValues().iterator(); + int valCount = 0; + while (valuesIter.hasNext()) { + valCount++; + Integer val = (Integer) valuesIter.next(); + } + assertEquals(2, valCount); + } + assertEquals(9, keyCount); + + getNextFromFinishedReader(kvsReader); + } + + private void getNextFromFinishedReader(KeyValueReader kvReader) { + //Try reading again and it should throw IOException + try { + boolean hasNext = kvReader.next(); + fail(); + } catch(IOException e) { + assertTrue(e.getMessage().contains("For usage, please refer to")); + } } private static class SortedTestInput extends OrderedGroupedKVInput { @@ -404,8 +500,10 @@ public class TestSortedGroupedMergedInput { @Override public boolean next() throws IOException { + hasCompletedProcessing(); currentIndex++; if (keys == null || currentIndex >= keys.length) { + completedProcessing = true; return false; } return true; @@ -426,6 +524,47 @@ public class TestSortedGroupedMergedInput { } } + private static class DummyInput implements Input { + DummyKeyValueReader reader; + + public DummyInput(int records) { + reader = new DummyKeyValueReader(records); + } + + @Override + public void start() throws Exception { + } + + @Override + public Reader getReader() throws Exception { + return reader; + } + } + + private static class DummyKeyValueReader extends KeyValueReader { + private int records; + + public DummyKeyValueReader(int records) { + this.records = records; + } + + @Override + public boolean next() throws IOException { + return (records-- > 0); + } + + @Override + public Object getCurrentKey() throws IOException { + return records; + } + + @Override + public Object getCurrentValue() throws IOException { + return records; + } + } + + private static class RawComparatorForTest implements RawComparator<Integer> { @Override
