Repository: tez Updated Branches: refs/heads/master b3353ea27 -> 6f1713d03
TEZ-1491. Tez reducer-side merge's counter update is slow. (gopalv via rbalamohan) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/6f1713d0 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/6f1713d0 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/6f1713d0 Branch: refs/heads/master Commit: 6f1713d033f6b37287b1bf10b07c49dcef916f32 Parents: b3353ea Author: Rajesh Balamohan <[email protected]> Authored: Thu Dec 10 09:58:07 2015 +0530 Committer: Rajesh Balamohan <[email protected]> Committed: Thu Dec 10 09:58:07 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../common/sort/impl/PipelinedSorter.java | 4 +++- .../library/common/sort/impl/TezMerger.java | 7 ++++--- .../tez/runtime/library/utils/LocalProgress.java | 19 +++++++++++++++++++ 4 files changed, 27 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/6f1713d0/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index cfa93bf..276651f 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES TEZ-2949. Allow duplicate dag names within session for Tez. ALL CHANGES: + TEZ-1491. Tez reducer-side merge's counter update is slow. TEZ-2943. Change shuffle vertex manager to use per vertex data for auto reduce and slow start TEZ-2346. TEZ-UI: Lazy load other info / counter data http://git-wip-us.apache.org/repos/asf/tez/blob/6f1713d0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java index 33a65d2..fc65622 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java @@ -36,6 +36,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; import com.google.common.collect.Lists; + import org.apache.tez.runtime.library.api.IOInterruptedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,6 +58,7 @@ import org.apache.tez.runtime.library.common.ConfigUtils; import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer; import org.apache.tez.runtime.library.common.sort.impl.TezMerger.Segment; +import org.apache.tez.runtime.library.utils.LocalProgress; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -1044,7 +1046,7 @@ public class PipelinedSorter extends ExternalSorter { private final SortSpan span; private final InputByteBuffer key = new InputByteBuffer(); private final InputByteBuffer value = new InputByteBuffer(); - private final Progress progress = new Progress(); + private final Progress progress = new LocalProgress(); private static final int minrun = (1 << 4); http://git-wip-us.apache.org/repos/asf/tez/blob/6f1713d0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java index 35a9276..44618a0 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java @@ -24,6 +24,7 @@ import java.util.Comparator; import java.util.List; import com.google.common.annotations.VisibleForTesting; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -49,6 +50,7 @@ import org.apache.tez.runtime.library.common.sort.impl.IFile.Reader; import org.apache.tez.runtime.library.common.sort.impl.IFile.Reader.KeyState; import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer; import org.apache.tez.runtime.library.utils.BufferUtils; +import org.apache.tez.runtime.library.utils.LocalProgress; /** * Merger is an utility class used by the Map and Reduce tasks for merging @@ -402,7 +404,7 @@ public class TezMerger { private long totalBytesProcessed; private float progPerByte; - private Progress mergeProgress = new Progress(); + private Progress mergeProgress = new LocalProgress(); // Boolean variable for including/considering final merge as part of sort // phase or not. This is true in map task, false in reduce task. It is // used in calculating mergeProgress. @@ -922,10 +924,9 @@ public class TezMerger { } private static class EmptyIterator implements TezRawKeyValueIterator { - final Progress progress; + final Progress progress = new Progress(); EmptyIterator() { - progress = new Progress(); progress.set(1.0f); } http://git-wip-us.apache.org/repos/asf/tez/blob/6f1713d0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/LocalProgress.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/LocalProgress.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/LocalProgress.java new file mode 100644 index 0000000..9d653fc --- /dev/null +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/LocalProgress.java @@ -0,0 +1,19 @@ +package org.apache.tez.runtime.library.utils; + +import org.apache.hadoop.util.Progress; + +/* + * thread unsafe version of hadoop's progress impl + */ +public final class LocalProgress extends Progress { + private float currentProgress = Float.NaN; + + @Override + public void set(float progress) { + if (progress != currentProgress) { + currentProgress = progress; + // enter lock section + super.set(progress); + } + } +}
