Author: daijy
Date: Sat May 23 23:00:22 2015
New Revision: 1681399
URL: http://svn.apache.org/r1681399
Log:
PIG-4564: Pig can deadlock in POPartialAgg if there is a bag
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java
Modified: pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1681399&r1=1681398&r2=1681399&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Sat May 23 23:00:22 2015
@@ -82,6 +82,8 @@ PIG-4333: Split BigData tests into multi
BUG FIXES
+PIG-4564: Pig can deadlock in POPartialAgg if there is a bag (rohini via daijy)
+
PIG-4569: Fix e2e test Rank_1 failure (rohini)
PIG-4490: MIN/MAX builtin UDFs return wrong results when accumulating for
strings (xplenty via rohini)
Modified: pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java?rev=1681399&r1=1681398&r2=1681399&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java
(original)
+++ pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java Sat May
23 23:00:22 2015
@@ -50,7 +50,11 @@ public class SpillableMemoryManager impl
private final Log log = LogFactory.getLog(getClass());
- LinkedList<WeakReference<Spillable>> spillables = new
LinkedList<WeakReference<Spillable>>();
+ private LinkedList<WeakReference<Spillable>> spillables = new
LinkedList<WeakReference<Spillable>>();
+ // References to spillables with size
+ private LinkedList<SpillablePtr> spillablesSR = null;
+
+ private Object spillLock = new Object();
// if we freed at least this much, invoke GC
// (default 40 MB - this can be overridden by user supplied property)
@@ -62,15 +66,15 @@ public class SpillableMemoryManager impl
// this will keep track of memory freed across spills
// and between GC invocations
- private static long accumulatedFreeSize = 0L;
+ private long accumulatedFreeSize = 0L;
// fraction of biggest heap for which we want to get
// "memory usage threshold exceeded" notifications
- private static double memoryThresholdFraction = 0.7;
+ private double memoryThresholdFraction = 0.7;
// fraction of biggest heap for which we want to get
// "collection threshold exceeded" notifications
- private static double collectionMemoryThresholdFraction = 0.5;
+ private double collectionMemoryThresholdFraction = 0.5;
// log notification on usage threshold exceeded only the first time
private boolean firstUsageThreshExceededLogged = false;
@@ -80,10 +84,12 @@ public class SpillableMemoryManager impl
// fraction of the total heap used for the threshold to determine
// if we want to perform an extra gc before the spill
- private static double extraGCThresholdFraction = 0.05;
- private static long extraGCSpillSizeThreshold = 0L;
+ private double extraGCThresholdFraction = 0.05;
+ private long extraGCSpillSizeThreshold = 0L;
+
+ private volatile boolean blockRegisterOnSpill = false;
- private static volatile SpillableMemoryManager manager;
+ private static volatile SpillableMemoryManager manager = new
SpillableMemoryManager();
private SpillableMemoryManager() {
((NotificationEmitter)ManagementFactory.getMemoryMXBean()).addNotificationListener(this,
null, null);
@@ -129,9 +135,6 @@ public class SpillableMemoryManager impl
}
public static SpillableMemoryManager getInstance() {
- if (manager == null) {
- manager = new SpillableMemoryManager();
- }
return manager;
}
@@ -187,119 +190,135 @@ public class SpillableMemoryManager impl
}
}
- clearSpillables();
if (toFree < 0) {
log.debug("low memory handler returning " +
"because there is nothing to free");
return;
}
- synchronized(spillables) {
- Collections.sort(spillables, new
Comparator<WeakReference<Spillable>>() {
- /**
- * We don't lock anything, so this sort may not be stable if a
WeakReference suddenly
- * becomes null, but it will be close enough.
- * Also between the time we sort and we use these spillables,
they
- * may actually change in size - so this is just best effort
- */
- @Override
- public int compare(WeakReference<Spillable> o1Ref,
WeakReference<Spillable> o2Ref) {
- Spillable o1 = o1Ref.get();
- Spillable o2 = o2Ref.get();
- if (o1 == null && o2 == null) {
- return 0;
- }
- if (o1 == null) {
- return 1;
+ // Use a separate spillLock to block multiple handleNotification calls
+ synchronized (spillLock) {
+ synchronized(spillables) {
+ spillablesSR = new LinkedList<SpillablePtr>();
+ for (Iterator<WeakReference<Spillable>> i =
spillables.iterator(); i.hasNext();) {
+ Spillable s = i.next().get();
+ if (s == null) {
+ i.remove();
+ continue;
}
- if (o2 == null) {
+ // Create a list with spillable size for stable sorting.
Refer PIG-4012
+ spillablesSR.add(new SpillablePtr(s, s.getMemorySize()));
+ }
+ log.debug("Spillables list size: " + spillablesSR.size());
+ Collections.sort(spillablesSR, new Comparator<SpillablePtr>() {
+ @Override
+ public int compare(SpillablePtr o1Ref, SpillablePtr o2Ref)
{
+ Spillable o1 = o1Ref.get();
+ Spillable o2 = o2Ref.get();
+ long o1Size = o1.getMemorySize();
+ long o2Size = o2.getMemorySize();
+
+ if (o1Size == o2Size) {
+ return 0;
+ }
+ if (o1Size < o2Size) {
+ return 1;
+ }
return -1;
}
- long o1Size = o1.getMemorySize();
- long o2Size = o2.getMemorySize();
+ });
+ // Block new bags from being registered
+ blockRegisterOnSpill = true;
+ }
- if (o1Size == o2Size) {
- return 0;
- }
- if (o1Size < o2Size) {
- return 1;
- }
- return -1;
- }
- });
- long estimatedFreed = 0;
- int numObjSpilled = 0;
- boolean invokeGC = false;
- boolean extraGCCalled = false;
- for (Iterator<WeakReference<Spillable>> i = spillables.iterator();
i.hasNext();) {
- WeakReference<Spillable> weakRef = i.next();
- Spillable s = weakRef.get();
- // Still need to check for null here, even after we removed
- // above, because the reference may have gone bad on us
- // since the last check.
- if (s == null) {
- i.remove();
- continue;
- }
- long toBeFreed = s.getMemorySize();
- log.debug("Memorysize = "+toBeFreed+", spillFilesizethreshold
= "+spillFileSizeThreshold+", gcactivationsize = "+gcActivationSize);
- // Don't keep trying if the rest of files are too small
- if (toBeFreed < spillFileSizeThreshold) {
- log.debug("spilling small files - getting out of memory
handler");
- break ;
- }
- // If single Spillable is bigger than the threshold,
- // we force GC to make sure we really need to keep this
- // object before paying for the expensive spill().
- // Done at most once per handleNotification.
- // Do not invoke extraGC for GroupingSpillable. Its size will
always exceed
- // extraGCSpillSizeThreshold and the data is always strong
referenced.
- if( !extraGCCalled && extraGCSpillSizeThreshold != 0
- && toBeFreed > extraGCSpillSizeThreshold && !(s
instanceof GroupingSpillable)) {
- log.debug("Single spillable has size " + toBeFreed +
"bytes. Calling extra gc()");
- // this extra assignment to null is needed so that gc can
free the
- // spillable if nothing else is pointing at it
- s = null;
- System.gc();
- extraGCCalled = true;
- // checking again to see if this reference is still valid
- s = weakRef.get();
+ try {
+ long estimatedFreed = 0;
+ int numObjSpilled = 0;
+ boolean invokeGC = false;
+ boolean extraGCCalled = false;
+ boolean isGroupingSpillable = false;
+ for (Iterator<SpillablePtr> i = spillablesSR.iterator();
i.hasNext();) {
+ SpillablePtr sPtr = i.next();
+ Spillable s = sPtr.get();
+ // Still need to check for null here, even after we removed
+ // above, because the reference may have gone bad on us
+ // since the last check.
if (s == null) {
i.remove();
- accumulatedFreeSize = 0;
- invokeGC = false;
continue;
}
+ long toBeFreed = sPtr.getMemorySize();
+ log.debug("Memorysize = "+toBeFreed+",
spillFilesizethreshold = "+spillFileSizeThreshold+", gcactivationsize =
"+gcActivationSize);
+ // Don't keep trying if the rest of files are too small
+ if (toBeFreed < spillFileSizeThreshold) {
+ log.debug("spilling small files - getting out of
memory handler");
+ break ;
+ }
+ isGroupingSpillable = (s instanceof GroupingSpillable);
+ // If single Spillable is bigger than the threshold,
+ // we force GC to make sure we really need to keep this
+ // object before paying for the expensive spill().
+ // Done at most once per handleNotification.
+ // Do not invoke extraGC for GroupingSpillable. Its size
will always exceed
+ // extraGCSpillSizeThreshold and the data is always strong
referenced.
+ if( !extraGCCalled && extraGCSpillSizeThreshold != 0
+ && toBeFreed > extraGCSpillSizeThreshold &&
!isGroupingSpillable) {
+ log.debug("Single spillable has size " + toBeFreed +
"bytes. Calling extra gc()");
+ // this extra assignment to null is needed so that gc
can free the
+ // spillable if nothing else is pointing at it
+ s = null;
+ System.gc();
+ extraGCCalled = true;
+ // checking again to see if this reference is still
valid
+ s = sPtr.get();
+ if (s == null) {
+ i.remove();
+ accumulatedFreeSize = 0;
+ invokeGC = false;
+ continue;
+ }
+ }
+ // Unblock registering of new bags temporarily as
aggregation
+ // of POPartialAgg requires new record to be loaded.
+ blockRegisterOnSpill = !isGroupingSpillable;
+ try {
+ s.spill();
+ } finally {
+ blockRegisterOnSpill = true;
+ }
+
+ numObjSpilled++;
+ estimatedFreed += toBeFreed;
+ accumulatedFreeSize += toBeFreed;
+ // This should significantly reduce the number of small
files
+ // in case that we have a lot of nested bags
+ if (accumulatedFreeSize > gcActivationSize) {
+ invokeGC = true;
+ }
+
+ if (estimatedFreed > toFree) {
+ log.debug("Freed enough space - getting out of memory
handler");
+ invokeGC = true;
+ break;
+ }
}
- s.spill();
- numObjSpilled++;
- estimatedFreed += toBeFreed;
- accumulatedFreeSize += toBeFreed;
- // This should significantly reduce the number of small files
- // in case that we have a lot of nested bags
- if (accumulatedFreeSize > gcActivationSize) {
- invokeGC = true;
+ spillablesSR = null;
+ /* Poke the GC again to see if we successfully freed enough
memory */
+ if(invokeGC) {
+ System.gc();
+ // now that we have invoked the GC, reset
accumulatedFreeSize
+ accumulatedFreeSize = 0;
}
-
- if (estimatedFreed > toFree) {
- log.debug("Freed enough space - getting out of memory
handler");
- invokeGC = true;
- break;
+ if(estimatedFreed > 0){
+ String msg = "Spilled an estimate of " + estimatedFreed +
+ " bytes from " + numObjSpilled + " objects. " +
info.getUsage();;
+ log.info(msg);
}
+ } finally {
+ blockRegisterOnSpill = false;
}
- /* Poke the GC again to see if we successfully freed enough memory
*/
- if(invokeGC) {
- System.gc();
- // now that we have invoked the GC, reset accumulatedFreeSize
- accumulatedFreeSize = 0;
- }
- if(estimatedFreed > 0){
- String msg = "Spilled an estimate of " + estimatedFreed +
- " bytes from " + numObjSpilled + " objects. " +
info.getUsage();;
- log.info(msg);
- }
-
}
+
}
public void clearSpillables() {
@@ -321,7 +340,7 @@ public class SpillableMemoryManager impl
* @param s the spillable to track.
*/
public void registerSpillable(Spillable s) {
- synchronized(spillables) {
+ synchronized (spillables) {
// Cleaing the entire list is too expensive. Just trim off the
front while
// we can.
WeakReference<Spillable> first = spillables.peek();
@@ -329,7 +348,46 @@ public class SpillableMemoryManager impl
spillables.remove();
first = spillables.peek();
}
+
+ if (blockRegisterOnSpill) {
+ // When the spill is happening we do not want to register new
bags
+ // save for exceptions like POPartialAgg. So block here.
+ // blockRegisterOnSpill is set to false in the finally block
after spill.
+ // But just in case adding a safeguard of 5 min timeout
(assuming a large
+ // spill completes within 5 mins) instead of infinitely
blocking
+ // in case there are missed corner cases causing deadlock.
+ try {
+ int i = 6000;
+ for (; i > 0 && blockRegisterOnSpill; i--) {
+ Thread.sleep(50);
+ }
+ if (i == 0) {
+ log.warn("Spill took more than 5 mins. This needs
investigation");
+ }
+ } catch (InterruptedException e) {
+ log.warn("Interrupted exception in registerSpillable while
blocked on spill", e);
+ }
+ blockRegisterOnSpill = false;
+ }
spillables.add(new WeakReference<Spillable>(s));
}
}
+
+ private static class SpillablePtr {
+ private WeakReference<Spillable> spillable;
+ private long size;
+
+ public SpillablePtr(Spillable p, long s) {
+ spillable = new WeakReference<Spillable>(p);
+ size = s;
+ }
+
+ public Spillable get() {
+ return spillable.get();
+ }
+
+ public long getMemorySize() {
+ return size;
+ }
+ }
}