Repository: tez Updated Branches: refs/heads/master db6f05f7f -> 1c31b7978
TEZ-3699. For large dataset, pipelined shuffle throws exceptions in consumer side for UnorderedPartitioned edge (rbalamohan) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/1c31b797 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/1c31b797 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/1c31b797 Branch: refs/heads/master Commit: 1c31b7978db0461c1f6626952d1c61d15f4bf21b Parents: db6f05f Author: Rajesh Balamohan <[email protected]> Authored: Thu Apr 27 05:13:33 2017 +0530 Committer: Rajesh Balamohan <[email protected]> Committed: Thu Apr 27 05:13:33 2017 +0530 ---------------------------------------------------------------------- .../library/common/writers/UnorderedPartitionedKVWriter.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/1c31b797/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java index d8cedac..ea49118 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java @@ -380,9 +380,10 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit pendingSpillCount.incrementAndGet(); + int spillNumber = numSpills.getAndIncrement(); ListenableFuture<SpillResult> future = spillExecutor.submit( - new SpillCallable(currentBuffer, codec, spilledRecordsCounter, numSpills.getAndIncrement())); - Futures.addCallback(future, new SpillCallback(numSpills.get())); + new SpillCallable(currentBuffer, codec, spilledRecordsCounter, spillNumber)); + Futures.addCallback(future, new SpillCallback(spillNumber)); // Update once per buffer (instead of every record) updateTezCountersAndNotify();
