Repository: tez Updated Branches: refs/heads/branch-0.6 7514b157e -> 3875a0409
TEZ-2716. DefaultSorter.isRleNeeded not thread safe (rbalamohan) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/3875a040 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/3875a040 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/3875a040 Branch: refs/heads/branch-0.6 Commit: 3875a04093999463262ed003716c9e5c826bd8df Parents: 7514b15 Author: Rajesh Balamohan <[email protected]> Authored: Thu Oct 1 03:24:29 2015 +0530 Committer: Rajesh Balamohan <[email protected]> Committed: Thu Oct 1 03:24:29 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../common/sort/impl/dflt/DefaultSorter.java | 33 ++++++++++++++------ 2 files changed, 25 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/3875a040/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 1209e22..768db48 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -6,6 +6,7 @@ Release 0.6.3: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2716. DefaultSorter.isRleNeeded not thread safe. TEZ-2758. Remove append API in RecoveryService after TEZ-1909. TEZ-2851. Support a way for upstream applications to pass in a caller context to Tez. TEZ-2398. Flaky test: TestFaultTolerance http://git-wip-us.apache.org/repos/asf/tez/blob/3875a040/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java index 69a303c..6ebc442 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java @@ -56,7 +56,7 @@ import org.apache.tez.runtime.library.common.sort.impl.TezMerger.Segment; import com.google.common.base.Preconditions; @SuppressWarnings({"unchecked", "rawtypes"}) -public class DefaultSorter extends ExternalSorter implements IndexedSortable { +public final class DefaultSorter extends ExternalSorter implements IndexedSortable { private static final Log LOG = LogFactory.getLog(DefaultSorter.class); @@ -646,7 +646,13 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { "); length = " + (distanceTo(kvend, kvstart, kvmeta.capacity()) + 1) + "/" + maxRec); } - sortAndSpill(); + long sameKeyCount = 0; + long totalKeysCount = 0; + synchronized (this) { + sameKeyCount = sameKey; + totalKeysCount = totalKeys; + } + sortAndSpill(sameKeyCount, totalKeysCount); } } catch (InterruptedException e) { throw new IOException("Interrupted while waiting for the writer", e); @@ -679,6 +685,9 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { protected class SpillThread extends Thread { + volatile long totalKeysCount; + volatile long sameKeyCount; + @Override public void run() { spillLock.lock(); @@ -691,7 +700,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { } try { spillLock.unlock(); - sortAndSpill(); + sortAndSpill(sameKeyCount, totalKeysCount); } catch (Throwable t) { LOG.warn("Got an exception in sortAndSpill", t); sortSpillException = t; @@ -712,6 +721,11 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { spillThreadRunning = false; } } + + public void setTotalKeysProcessed(long sameKeyCount, long totalKeysCount) { + this.sameKeyCount = sameKeyCount; + this.totalKeysCount = totalKeysCount; + } } private void checkSpillException() throws IOException { @@ -740,6 +754,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { "); length = " + (distanceTo(kvend, kvstart, kvmeta.capacity()) + 1) + "/" + maxRec); } + spillThread.setTotalKeysProcessed(sameKey, totalKeys); spillReady.signal(); } @@ -754,19 +769,19 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { : kvmeta.capacity() + kvstart) / NMETA; } - private boolean isRLENeeded() { - return (sameKey > (0.1 * totalKeys)) || (sameKey < 0); + private boolean isRLENeeded(long sameKeyCount, long totalKeysCount) { + return (sameKeyCount > (0.1 * totalKeysCount)) || (sameKeyCount < 0); } - protected void sortAndSpill() + protected void sortAndSpill(long sameKeyCount, long totalKeysCount) throws IOException, InterruptedException { final int mstart = getMetaStart(); final int mend = getMetaEnd(); sorter.sort(this, mstart, mend, nullProgressable); - spill(mstart, mend); + spill(mstart, mend, sameKeyCount, totalKeysCount); } - protected void spill(int mstart, int mend) + protected void spill(int mstart, int mend, long sameKeyCount, long totalKeysCount) throws IOException, InterruptedException { //approximate the length of the output file to be the length of the @@ -785,7 +800,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { int spindex = mstart; final InMemValBytes value = createInMemValBytes(); - boolean rle = isRLENeeded(); + boolean rle = isRLENeeded(sameKeyCount, totalKeysCount); for (int i = 0; i < partitions; ++i) { IFile.Writer writer = null; try {
