Repository: tez Updated Branches: refs/heads/master 81a12216c -> 592c74204
TEZ-2716. DefaultSorter.isRleNeeded not thread safe (rbalamohan) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/592c7420 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/592c7420 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/592c7420 Branch: refs/heads/master Commit: 592c74204c5428a8d4d45612d58363feb5cdf3b1 Parents: 81a1221 Author: Rajesh Balamohan <[email protected]> Authored: Wed Sep 23 04:14:01 2015 +0530 Committer: Rajesh Balamohan <[email protected]> Committed: Wed Sep 23 04:14:01 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 1 + tez-runtime-library/findbugs-exclude.xml | 6 ---- .../common/sort/impl/dflt/DefaultSorter.java | 33 ++++++++++++++------ 3 files changed, 25 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/592c7420/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 84c0679..e174b69 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -189,6 +189,7 @@ Release 0.7.1: Unreleased INCOMPATIBLE CHANGES ALL CHANGES + TEZ-2716. DefaultSorter.isRleNeeded not thread safe TEZ-2847. Tez UI: Task details doesn't gets updated on manual refresh after job complete TEZ-2843. Tez UI: Show error if in progress fails due to AM not reachable TEZ-2842. Tez UI: Update Tez App details page while in-progress http://git-wip-us.apache.org/repos/asf/tez/blob/592c7420/tez-runtime-library/findbugs-exclude.xml ---------------------------------------------------------------------- diff --git a/tez-runtime-library/findbugs-exclude.xml b/tez-runtime-library/findbugs-exclude.xml index c158d0e..b7bb43a 100644 --- a/tez-runtime-library/findbugs-exclude.xml +++ b/tez-runtime-library/findbugs-exclude.xml @@ -93,12 +93,6 @@ </Match> <Match> - <Class name="org.apache.tez.runtime.library.common.sort.impl.dflt.DefaultSorter"/> - <Field name="totalKeys"/> - <Bug pattern="IS2_INCONSISTENT_SYNC"/> - </Match> - - <Match> <Class name="~org\.apache\.tez\.runtime\.library\.shuffle\.impl\.ShuffleUserPayloads\$.*Proto"/> <Field name="PARSER"/> <Bug pattern="MS_SHOULD_BE_FINAL"/> http://git-wip-us.apache.org/repos/asf/tez/blob/592c7420/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java index 727f9a2..a833228 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java @@ -60,7 +60,7 @@ import org.apache.tez.runtime.library.common.sort.impl.TezMerger.Segment; import com.google.common.base.Preconditions; @SuppressWarnings({"unchecked", "rawtypes"}) -public class DefaultSorter extends ExternalSorter implements IndexedSortable { +public final class DefaultSorter extends ExternalSorter implements IndexedSortable { private static final Logger LOG = LoggerFactory.getLogger(DefaultSorter.class); @@ -704,7 +704,13 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { + ", length = " + (distanceTo(kvend, kvstart, kvmeta.capacity()) + 1) + "/" + maxRec); } - sortAndSpill(); + long sameKeyCount = 0; + long totalKeysCount = 0; + synchronized (this) { + sameKeyCount = sameKey; + totalKeysCount = totalKeys; + } + sortAndSpill(sameKeyCount, totalKeysCount); } } catch (InterruptedException e) { //Reset status @@ -734,6 +740,9 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { protected class SpillThread extends Thread { + volatile long totalKeysCount; + volatile long sameKeyCount; + @Override public void run() { spillLock.lock(); @@ -746,7 +755,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { } try { spillLock.unlock(); - sortAndSpill(); + sortAndSpill(sameKeyCount, totalKeysCount); } catch (Throwable t) { LOG.warn(outputContext.getDestinationVertexName() + ": " + "Got an exception in sortAndSpill", t); sortSpillException = t; @@ -768,6 +777,11 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { spillThreadRunning = false; } } + + public void setTotalKeysProcessed(long sameKeyCount, long totalKeysCount) { + this.sameKeyCount = sameKeyCount; + this.totalKeysCount = totalKeysCount; + } } private void checkSpillException() throws IOException { @@ -798,6 +812,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { +", kvend = " + kvend + "(" + (kvend * 4) + ")" + ", length = " + (distanceTo(kvend, kvstart, kvmeta.capacity()) + 1) + "/" + maxRec); } + spillThread.setTotalKeysProcessed(sameKey, totalKeys); spillReady.signal(); } @@ -812,16 +827,16 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { : kvmeta.capacity() + kvstart) / NMETA; } - private boolean isRLENeeded() { - return (sameKey > (0.1 * totalKeys)) || (sameKey < 0); + private boolean isRLENeeded(long sameKeyCount, long totalKeysCount) { + return (sameKeyCount > (0.1 * totalKeysCount)) || (sameKeyCount < 0); } - protected void sortAndSpill() + protected void sortAndSpill(long sameKeyCount, long totalKeysCount) throws IOException, InterruptedException { final int mstart = getMetaStart(); final int mend = getMetaEnd(); sorter.sort(this, mstart, mend, nullProgressable); - spill(mstart, mend); + spill(mstart, mend, sameKeyCount, totalKeysCount); } private void adjustSpillCounters(long rawLen, long compLength) { @@ -839,7 +854,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { } } - protected void spill(int mstart, int mend) + protected void spill(int mstart, int mend, long sameKeyCount, long totalKeysCount) throws IOException, InterruptedException { //approximate the length of the output file to be the length of the @@ -859,7 +874,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { int spindex = mstart; final InMemValBytes value = createInMemValBytes(); - boolean rle = isRLENeeded(); + boolean rle = isRLENeeded(sameKeyCount, totalKeysCount); for (int i = 0; i < partitions; ++i) { IFile.Writer writer = null; try {
