Repository: tez Updated Branches: refs/heads/branch-0.7 9d5e12728 -> 2a8368c66
TEZ-2716. DefaultSorter.isRleNeeded not thread safe (rbalamohan) (cherry picked from commit 592c74204c5428a8d4d45612d58363feb5cdf3b1) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/2a8368c6 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/2a8368c6 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/2a8368c6 Branch: refs/heads/branch-0.7 Commit: 2a8368c66912fe38da4590b53962565a308f277f Parents: 9d5e127 Author: Rajesh Balamohan <[email protected]> Authored: Wed Sep 23 04:14:01 2015 +0530 Committer: Rajesh Balamohan <[email protected]> Committed: Wed Sep 23 04:15:13 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 1 + tez-runtime-library/findbugs-exclude.xml | 6 ---- .../common/sort/impl/dflt/DefaultSorter.java | 33 ++++++++++++++------ 3 files changed, 25 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/2a8368c6/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 2138d7c..ab1c29c 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -6,6 +6,7 @@ Release 0.7.1: Unreleased INCOMPATIBLE CHANGES ALL CHANGES + TEZ-2716. DefaultSorter.isRleNeeded not thread safe TEZ-2847. Tez UI: Task details doesn't gets updated on manual refresh after job complete TEZ-2844. Backport TEZ-2775 to branch-0.7. Improve and consolidate logging in Runtime components. TEZ-2843. Tez UI: Show error if in progress fails due to AM not reachable http://git-wip-us.apache.org/repos/asf/tez/blob/2a8368c6/tez-runtime-library/findbugs-exclude.xml ---------------------------------------------------------------------- diff --git a/tez-runtime-library/findbugs-exclude.xml b/tez-runtime-library/findbugs-exclude.xml index aa1c7a2..45c194c 100644 --- a/tez-runtime-library/findbugs-exclude.xml +++ b/tez-runtime-library/findbugs-exclude.xml @@ -81,12 +81,6 @@ </Match> <Match> - <Class name="org.apache.tez.runtime.library.common.sort.impl.dflt.DefaultSorter"/> - <Field name="totalKeys"/> - <Bug pattern="IS2_INCONSISTENT_SYNC"/> - </Match> - - <Match> <Class name="~org\.apache\.tez\.runtime\.library\.shuffle\.impl\.ShuffleUserPayloads\$.*Proto"/> <Field name="PARSER"/> <Bug pattern="MS_SHOULD_BE_FINAL"/> http://git-wip-us.apache.org/repos/asf/tez/blob/2a8368c6/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java index 2a10c35..ac90112 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java @@ -59,7 +59,7 @@ import org.apache.tez.runtime.library.common.sort.impl.TezMerger.Segment; import com.google.common.base.Preconditions; @SuppressWarnings({"unchecked", "rawtypes"}) -public class DefaultSorter extends ExternalSorter implements IndexedSortable { +public final class DefaultSorter extends ExternalSorter implements IndexedSortable { private static final Logger LOG = LoggerFactory.getLogger(DefaultSorter.class); @@ -663,7 +663,13 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { + ", length = " + (distanceTo(kvend, kvstart, kvmeta.capacity()) + 1) + "/" + maxRec); } - sortAndSpill(); + long sameKeyCount = 0; + long totalKeysCount = 0; + synchronized (this) { + sameKeyCount = sameKey; + totalKeysCount = totalKeys; + } + sortAndSpill(sameKeyCount, totalKeysCount); } } catch (InterruptedException e) { throw new IOException("Interrupted while waiting for the writer", e); @@ -698,6 +704,9 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { protected class SpillThread extends Thread { + volatile long totalKeysCount; + volatile long sameKeyCount; + @Override public void run() { spillLock.lock(); @@ -710,7 +719,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { } try { spillLock.unlock(); - sortAndSpill(); + sortAndSpill(sameKeyCount, totalKeysCount); } catch (Throwable t) { LOG.warn(outputContext.getDestinationVertexName() + ": " + "Got an exception in sortAndSpill", t); sortSpillException = t; @@ -731,6 +740,11 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { spillThreadRunning = false; } } + + public void setTotalKeysProcessed(long sameKeyCount, long totalKeysCount) { + this.sameKeyCount = sameKeyCount; + this.totalKeysCount = totalKeysCount; + } } private void checkSpillException() throws IOException { @@ -757,6 +771,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { +", kvend = " + kvend + "(" + (kvend * 4) + ")" + ", length = " + (distanceTo(kvend, kvstart, kvmeta.capacity()) + 1) + "/" + maxRec); } + spillThread.setTotalKeysProcessed(sameKey, totalKeys); spillReady.signal(); } @@ -771,16 +786,16 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { : kvmeta.capacity() + kvstart) / NMETA; } - private boolean isRLENeeded() { - return (sameKey > (0.1 * totalKeys)) || (sameKey < 0); + private boolean isRLENeeded(long sameKeyCount, long totalKeysCount) { + return (sameKeyCount > (0.1 * totalKeysCount)) || (sameKeyCount < 0); } - protected void sortAndSpill() + protected void sortAndSpill(long sameKeyCount, long totalKeysCount) throws IOException, InterruptedException { final int mstart = getMetaStart(); final int mend = getMetaEnd(); sorter.sort(this, mstart, mend, nullProgressable); - spill(mstart, mend); + spill(mstart, mend, sameKeyCount, totalKeysCount); } private void adjustSpillCounters(long rawLen, long compLength) { @@ -799,7 +814,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { } } - protected void spill(int mstart, int mend) + protected void spill(int mstart, int mend, long sameKeyCount, long totalKeysCount) throws IOException, InterruptedException { //approximate the length of the output file to be the length of the @@ -819,7 +834,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { int spindex = mstart; final InMemValBytes value = createInMemValBytes(); - boolean rle = isRLENeeded(); + boolean rle = isRLENeeded(sameKeyCount, totalKeysCount); for (int i = 0; i < partitions; ++i) { IFile.Writer writer = null; try {
