A new pattern of pig jobs that succeed with MR and fail with Tez have been identified. Hoping to brainstorm ideas so we can identify issues and file target jiras.
Here is a typical stack trace, though sometimes it occurs with final merge (since in-memory segment overhead > mapout overhead) Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded at org.apache.hadoop.io.DataInputBuffer.<init>(DataInputBuffer.java:68) at org.apache.tez.runtime.library.common.shuffle.orderedgrouped.InMemoryReader.<init>(InMemoryReader.java:42) at org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MergeManager.createInMemorySegments(MergeManager.java:837) at org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MergeManager.access$200(MergeManager.java:75) at org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MergeManager$InMemoryMerger.merge(MergeManager.java:642) at org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MergeThread.run(MergeThread.java:89) *Details* around 1,000,000 spills were fetched committing around 100MB to the memory budget (500,000 in memory). However, actual memory used for 500,000 segments (50-350 bytes) is 480MB (expected 100-200MB) *Mapout overhead is not budgeted* Each mapoutput needs around 50 bytes in addition to the data *In Memory Segment overhead is not budgeted* Each In memory segment needs around 80 bytes in addition to the data *Interaction with auto reduce parallelism* In this scenario, the upstream vertex was assuming 999 (pig's default hint to use auto-reduce parallelism) downstream tasks. However, was reduced to 24 due to auto-reduce parallelism. This is putting 40 times more segments per downstream task. Should auto-reduce parallelism consider merge overhead when calculating parallelism? *Legacy Default Sorter Empty Segment* Default sorter does not optimize empty segments like pipeline sorter does and shows this symptom more. 2016-01-10 11:46:01,208 [INFO] [fetcher {scope_601} #7] |orderedgrouped.MergeManager|: closeInMemoryFile -> map-output of size: 116, inMemoryMapOutputs.size() -> 571831, commitMemory -> 91503730, usedMemory ->91503846, mapOutput=MapOutput( AttemptIdentifier: InputAttemptIdentifier [inputIdentifier=Input Identifier [inputIndex=763962], attemptNumber=0, pathComponent=attempt_1444791925832_10460712_1_00_017766_0_10003, spillType=0, spillId=-1], Type: MEMORY) 2016-01-10 11:46:01,208 [INFO] [fetcher {scope_601} #7] |orderedgrouped.ShuffleScheduler|: Completed fetch for attempt: {763962, 0, attempt_1444791925832_10460712_1_00_017766_0_10003} to MEMORY, csize=128, dsize=116, EndTime=1452426361208, TimeTaken=0, Rate=0.00 MB/s 2016-01-10 11:46:01,209 [INFO] [fetcher {scope_601} #7] |orderedgrouped.ShuffleScheduler|: scope_601: All inputs fetched for input vertex : scope-601 2016-01-10 11:46:01,209 [INFO] [fetcher {scope_601} #7] |orderedgrouped.ShuffleScheduler|: copy(1091856 (spillsFetched=1091856) of 1091856. Transfer rate (CumulativeDataFetched/TimeSinceInputStarted)) 0.68 MB/s)