Author: rohini
Date: Wed Mar 23 19:21:32 2016
New Revision: 1736376

URL: http://svn.apache.org/viewvc?rev=1736376&view=rev
Log:
PIG-4847: POPartialAgg processing and spill improvements (rohini)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/conf/pig.properties
    pig/trunk/src/org/apache/pig/PigConfiguration.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
    pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java

Modified: pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1736376&r1=1736375&r2=1736376&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Mar 23 19:21:32 2016
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-4847: POPartialAgg processing and spill improvements (rohini)
+
 PIG-4840: Do not turn off UnionOptimizer for unsupported storefuncs in case of 
no vertex groups (rohini)
 
 PIG-4843: Turn off combiner in reducer vertex for Tez if bags are in combine 
plan (rohini)

Modified: pig/trunk/conf/pig.properties
URL: 
http://svn.apache.org/viewvc/pig/trunk/conf/pig.properties?rev=1736376&r1=1736375&r2=1736376&view=diff
==============================================================================
--- pig/trunk/conf/pig.properties (original)
+++ pig/trunk/conf/pig.properties Wed Mar 23 19:21:32 2016
@@ -193,6 +193,20 @@
 #
 # pig.spill.gc.activation.size=40000000
 
+# For heaps of 1GB and less, SpillableMemoryManager spill will be triggered
+# if the fraction of biggest heap exceeds the usage threshold. Default is 0.7
+# pig.spill.memory.usage.threshold.fraction=0.7
+
+# For heaps of 1GB and less, SpillableMemoryManager spill will be triggered
+# if the fraction of big heap exceeds the collection threshold. Default is 0.7
+# pig.spill.collection.threshold.fraction=0.7 
+
+# For heaps bigger than 1GB, we want to use a fixed size for collection and
+# usage thresholds to better utilize memory. SpillableMemoryManager spill will 
be triggered
+# if the unused heap size falls below this threshold.
+# Default is 350 MB
+# pig.spill.unused.memory.threshold.size=367001600
+
 # Maximum amount of data to replicate using the distributed cache when doing
 # fragment-replicated join. (default: 1000000000, about 1GB) Consider 
increasing
 # this in a production environment, but carefully.

Modified: pig/trunk/src/org/apache/pig/PigConfiguration.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigConfiguration.java?rev=1736376&r1=1736375&r2=1736376&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigConfiguration.java (original)
+++ pig/trunk/src/org/apache/pig/PigConfiguration.java Wed Mar 23 19:21:32 2016
@@ -387,6 +387,24 @@ public class PigConfiguration {
     public static final String PIG_TEZ_DAG_STATUS_REPORT_INTERVAL = 
"pig.tez.dag.status.report.interval";
 
 
+    // SpillableMemoryManager settings
+
+    /**
+     * SpillableMemoryManager spill will be triggered if the fraction of 
biggest heap exceeds the usage threshold
+     * Default is 0.7
+     */
+    public static final String PIG_SPILL_MEMORY_USAGE_THRESHOLD_FRACTION = 
"pig.spill.memory.usage.threshold.fraction";
+
+    /**
+     * SpillableMemoryManager spill will be triggered if the fraction of 
biggest heap exceeds the collection threshold
+     * Default is 0.7
+     */
+    public static final String PIG_SPILL_COLLECTION_THRESHOLD_FRACTION = 
"pig.spill.collection.threshold.fraction";
+
+    /**
+     * SpillableMemoryManager spill will be triggered when unused memory falls 
below the threshold.
+     */
+    public static final String PIG_SPILL_UNUSED_MEMORY_THRESHOLD_SIZE = 
"pig.spill.unused.memory.threshold.size";
 
     // Deprecated settings of Pig 0.13
 

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java?rev=1736376&r1=1736375&r2=1736376&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java
 Wed Mar 23 19:21:32 2016
@@ -33,7 +33,6 @@ import org.apache.log4j.PropertyConfigur
 import org.apache.pig.PigConstants;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
@@ -161,7 +160,7 @@ public abstract class PigGenericMapBase
         super.setup(context);
 
         Configuration job = context.getConfiguration();
-        SpillableMemoryManager.configure(ConfigurationUtil.toProperties(job));
+        SpillableMemoryManager.getInstance().configure(job);
         context.getConfiguration().set(PigConstants.TASK_INDEX, 
Integer.toString(context.getTaskAttemptID().getTaskID().getId()));
         PigMapReduce.sJobContext = context;
         PigMapReduce.sJobConfInternal.set(context.getConfiguration());

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java?rev=1736376&r1=1736375&r2=1736376&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
 Wed Mar 23 19:21:32 2016
@@ -34,7 +34,6 @@ import org.apache.pig.PigConstants;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.HDataType;
-import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
@@ -313,7 +312,7 @@ public class PigGenericMapReduce {
             if (inIllustrator)
                 pack = getPack(context);
             Configuration jConf = context.getConfiguration();
-            
SpillableMemoryManager.configure(ConfigurationUtil.toProperties(jConf));
+            SpillableMemoryManager.getInstance().configure(jConf);
             context.getConfiguration().set(PigConstants.TASK_INDEX, 
Integer.toString(context.getTaskAttemptID().getTaskID().getId()));
             sJobContext = context;
             sJobConfInternal.set(context.getConfiguration());

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java?rev=1736376&r1=1736375&r2=1736376&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
 Wed Mar 23 19:21:32 2016
@@ -141,7 +141,7 @@ public class POPartialAgg extends Physic
     }
 
     private void init() throws ExecException {
-        ALL_POPARTS.put(this, null);
+
         numRecsInRawMap = 0;
         numRecsInProcessedMap = 0;
         minOutputReduction = DEFAULT_MIN_REDUCTION;
@@ -172,6 +172,9 @@ public class POPartialAgg extends Physic
             // Set them to true instead of adding another check for 
!disableMapAgg
             sizeReductionChecked = true;
             estimatedMemThresholds = true;
+        } else {
+            ALL_POPARTS.put(this, null);
+            SpillableMemoryManager.getInstance().registerSpillable(this);
         }
         // Avoid hashmap resizing. TODO: Investigate loadfactor of 0.90 or 1.0
         // newHashMapWithExpectedSize does new HashMap(expectedSize + 
expectedSize/3)
@@ -187,7 +190,6 @@ public class POPartialAgg extends Physic
             listSizeThreshold = Math.min(numRecordsToSample, MAX_LIST_SIZE);
         }
         initialized = true;
-        SpillableMemoryManager.getInstance().registerSpillable(this);
     }
 
     @Override
@@ -203,7 +205,7 @@ public class POPartialAgg extends Physic
         // The fact that we are in the latter stage is communicated via the 
doSpill
         // flag.
 
-        if (!initialized && !ALL_POPARTS.containsKey(this)) {
+        if (!initialized) {
             init();
         }
 
@@ -224,8 +226,12 @@ public class POPartialAgg extends Physic
                 if (doSpill == false) {
                     // SpillableMemoryManager requested a spill to reduce 
memory
                     // consumption. See if we can avoid it.
+                    // If we see that there are way more records in processed 
map than
+                    // raw map, it is better to spill after the aggregation
+                    boolean spillProcessedMap = (3 * numRecsInRawMap) < 
numRecsInProcessedMap;
                     aggregateBothLevels(false, false);
-                    if (shouldSpill()) {
+                    // Spill if the numRecsInProcessedMap
+                    if (spillProcessedMap || shouldSpill()) {
                         startSpill(false);
                     } else {
                         LOG.info("Avoided emitting records during spill memory 
call.");
@@ -360,6 +366,7 @@ public class POPartialAgg extends Physic
         // called and size reduction checked
         startSpill(false);
         disableMapAgg = true;
+        ALL_POPARTS.remove(this);
     }
 
     private boolean mapAggDisabled() {
@@ -379,19 +386,21 @@ public class POPartialAgg extends Physic
         return shouldAggregateSecondLevel();
     }
 
+    private void addKeySingleValToMap(Map<Object, List<Tuple>> map,
+            Object key, List<Tuple> inpTuple) throws ExecException {
+        List<Tuple> value = map.get(key);
+        if (value == null) {
+            map.put(key, inpTuple);
+        } else {
+            value.add(inpTuple.get(0));
+        }
+    }
+
     private void addKeyValToMap(Map<Object, List<Tuple>> map,
             Object key, Tuple inpTuple) throws ExecException {
         List<Tuple> value = map.get(key);
         if (value == null) {
-            if (isGroupAll) {
-                // Set exact array initial size to avoid array copies
-                // listSizeThreshold = numRecordsToSample before estimating 
memory
-                // thresholds and firstTierThreshold after memory estimation
-                int listSize = (map == rawInputMap) ? listSizeThreshold : 
Math.min(secondTierThreshold, MAX_LIST_SIZE);
-                value = new ArrayList<Tuple>(listSize);
-            } else {
-                value = new ArrayList<Tuple>();
-            }
+            value = createNewValueList(map);
             map.put(key, value);
         }
         value.add(inpTuple);
@@ -409,6 +418,20 @@ public class POPartialAgg extends Physic
         }
     }
 
+    private List<Tuple> createNewValueList(Map<Object, List<Tuple>> map) {
+        List<Tuple> value;
+        if (isGroupAll) {
+            // Set exact array initial size to avoid array copies
+            // listSizeThreshold = numRecordsToSample before estimating memory
+            // thresholds and firstTierThreshold after memory estimation
+            int listSize = (map == rawInputMap) ? listSizeThreshold : 
Math.min(secondTierThreshold + 1, MAX_LIST_SIZE);
+            value = new ArrayList<Tuple>(listSize);
+        } else {
+            value = new ArrayList<Tuple>();
+        }
+        return value;
+    }
+
     private void startSpill(boolean aggregate) throws ExecException {
         // If spillingIterator is null, we are already spilling and don't need 
to set up.
         if (spillingIterator != null) return;
@@ -452,13 +475,38 @@ public class POPartialAgg extends Physic
      * @throws ExecException
      */
     private int aggregate(Map<Object, List<Tuple>> fromMap, Map<Object, 
List<Tuple>> toMap, int numEntriesInTarget) throws ExecException {
+        boolean srcDestDifferent = (fromMap == toMap) ? false : true;
         Iterator<Map.Entry<Object, List<Tuple>>> iter = 
fromMap.entrySet().iterator();
         while (iter.hasNext()) {
             Map.Entry<Object, List<Tuple>> entry = iter.next();
-            Tuple valueTuple = createValueTuple(entry.getKey(), 
entry.getValue());
-            Result res = getOutput(entry.getKey(), valueTuple);
-            iter.remove();
-            addKeyValToMap(toMap, entry.getKey(), 
getAggResultTuple(res.result));
+            if (entry.getValue().size() == 1) {
+                // If fromMap and toMap are same (processedInputMap), then 
continue without any change
+                // If different (rawInputMap->processedInputMap), add directly 
to target and skip running through the valuePlans
+                if (srcDestDifferent) {
+                    iter.remove();
+                    addKeySingleValToMap(toMap, entry.getKey(), 
entry.getValue());
+                }
+            } else {
+                Tuple valueTuple = createValueTuple(entry.getKey(), 
entry.getValue());
+                Result res = getOutput(entry.getKey(), valueTuple);
+                if (srcDestDifferent) {
+                    // Remove from src and add to destination 
(rawInputMap->processedInputMap)
+                    iter.remove();
+                    addKeyValToMap(toMap, entry.getKey(), 
getAggResultTuple(res.result));
+                } else {
+                    // Update processedInputMap in place
+                    List<Tuple> value = entry.getValue();
+                    if (isGroupAll) {
+                        value.clear();
+                    } else {
+                        // Creating a new list to free more space instead of
+                        // calling clear as the same key might not repeat again
+                        value = createNewValueList(toMap);
+                        toMap.put(entry.getKey(), value);
+                    }
+                    value.add(getAggResultTuple(res.result));
+                }
+            }
             numEntriesInTarget++;
         }
         return numEntriesInTarget;
@@ -495,9 +543,7 @@ public class POPartialAgg extends Physic
             return;
         }
         int processedTuples = numRecsInProcessedMap;
-        Map<Object, List<Tuple>> newMap = 
Maps.newHashMapWithExpectedSize(processedInputMap.size());
-        numRecsInProcessedMap = aggregate(processedInputMap, newMap, 0);
-        processedInputMap = newMap;
+        numRecsInProcessedMap = aggregate(processedInputMap, 
processedInputMap, 0);
         LOG.info("Aggregated " + processedTuples + " processed tuples to " + 
numRecsInProcessedMap + " tuples");
     }
 
@@ -643,14 +689,13 @@ public class POPartialAgg extends Physic
                 LOG.info("Spill triggered by SpillableMemoryManager, but 
previous spill call is still not processed. Skipping");
                 return 0;
             }
-            LOG.info("Spill triggered by SpillableMemoryManager");
             synchronized(spillLock) {
                 if (rawInputMap != null) {
-                    LOG.info("Memory usage: " + getMemorySize()
-                            + ". Raw map: num keys = " + rawInputMap.size()
-                            + ", num tuples = "+ numRecsInRawMap
-                            + ", Processed map: num keys = " + 
processedInputMap.size()
-                            + ", num tuples = "+ numRecsInProcessedMap );
+                    LOG.info("Spill triggered. Memory usage: " + 
getMemorySize()
+                            + ". Raw map: keys = " + rawInputMap.size()
+                            + ", tuples = "+ numRecsInRawMap
+                            + ", Processed map: keys = " + 
processedInputMap.size()
+                            + ", tuples = "+ numRecsInProcessedMap );
                 }
                 startedContingentSpill = false;
                 doContingentSpill = true;

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java?rev=1736376&r1=1736375&r2=1736376&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
 Wed Mar 23 19:21:32 2016
@@ -56,6 +56,7 @@ import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.plan.DependencyOrderWalker;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.SpillableMemoryManager;
 import org.apache.pig.impl.util.UDFContext;
 import org.apache.pig.impl.util.Utils;
 import org.apache.pig.tools.pigstats.PigStatusReporter;
@@ -128,6 +129,7 @@ public class PigProcessor extends Abstra
 
         UserPayload payload = getContext().getUserPayload();
         conf = TezUtils.createConfFromUserPayload(payload);
+        SpillableMemoryManager.getInstance().configure(conf);
         PigContext.setPackageImportList((ArrayList<String>) ObjectSerializer
                 .deserialize(conf.get("udf.import.list")));
         PigContext pc = (PigContext) 
ObjectSerializer.deserialize(conf.get("pig.pigContext"));

Modified: pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java?rev=1736376&r1=1736375&r2=1736376&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java 
(original)
+++ pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java Wed Mar 
23 19:21:32 2016
@@ -27,7 +27,6 @@ import java.util.Comparator;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Properties;
 
 import javax.management.Notification;
 import javax.management.NotificationEmitter;
@@ -36,6 +35,8 @@ import javax.management.openmbean.Compos
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.PigConfiguration;
 
 /**
  * This class Tracks the tenured pool and a list of Spillable objects. When 
memory gets low, this
@@ -50,6 +51,11 @@ public class SpillableMemoryManager impl
 
     private static final Log log = 
LogFactory.getLog(SpillableMemoryManager.class);
 
+    private static final int ONE_GB = 1024 * 1024 * 1024;
+    private static final int UNUSED_MEMORY_THRESHOLD_DEFAULT = 350 * 1024 * 
1024;
+    private static final double MEMORY_THRESHOLD_FRACTION_DEFAULT = 0.7;
+    private static final double COLLECTION_THRESHOLD_FRACTION_DEFAULT = 0.7;
+
     private LinkedList<WeakReference<Spillable>> spillables = new 
LinkedList<WeakReference<Spillable>>();
     // References to spillables with size
     private LinkedList<SpillablePtr> spillablesSR = null;
@@ -68,13 +74,9 @@ public class SpillableMemoryManager impl
     // and between GC invocations
     private long accumulatedFreeSize = 0L;
 
-    // fraction of biggest heap for which we want to get
-    // "memory usage threshold exceeded" notifications
-    private double memoryThresholdFraction = 0.7;
-
-    // fraction of biggest heap for which we want to get
-    // "collection threshold exceeded" notifications
-    private double collectionMemoryThresholdFraction = 0.5;
+    private long memoryThresholdSize = 0L;
+
+    private long collectionThresholdSize = 0L;
 
     // log notification on usage threshold exceeded only the first time
     private boolean firstUsageThreshExceededLogged = false;
@@ -89,6 +91,8 @@ public class SpillableMemoryManager impl
 
     private volatile boolean blockRegisterOnSpill = false;
 
+    private MemoryPoolMXBean tenuredHeap;
+
     private static final SpillableMemoryManager manager = new 
SpillableMemoryManager();
 
     //@StaticDataCleanup
@@ -100,8 +104,6 @@ public class SpillableMemoryManager impl
     private SpillableMemoryManager() {
         
((NotificationEmitter)ManagementFactory.getMemoryMXBean()).addNotificationListener(this,
 null, null);
         List<MemoryPoolMXBean> mpbeans = 
ManagementFactory.getMemoryPoolMXBeans();
-        MemoryPoolMXBean tenuredHeap = null;
-        long tenuredHeapSize = 0;
         long totalSize = 0;
         for (MemoryPoolMXBean pool : mpbeans) {
             log.debug("Found heap (" + pool.getName() + ") of type " + 
pool.getType());
@@ -111,7 +113,6 @@ public class SpillableMemoryManager impl
                 // CMS Old Gen or "tenured" is the only heap that supports
                 // setting usage threshold.
                 if (pool.isUsageThresholdSupported()) {
-                    tenuredHeapSize = size;
                     tenuredHeap = pool;
                 }
             }
@@ -120,8 +121,38 @@ public class SpillableMemoryManager impl
         if (tenuredHeap == null) {
             throw new RuntimeException("Couldn't find heap");
         }
-        log.debug("Selected heap to monitor (" +
-            tenuredHeap.getName() + ")");
+
+        configureMemoryThresholds(MEMORY_THRESHOLD_FRACTION_DEFAULT,
+                COLLECTION_THRESHOLD_FRACTION_DEFAULT,
+                UNUSED_MEMORY_THRESHOLD_DEFAULT);
+
+    }
+
+    /**
+     * Configure thresholds for memory usage/collection threshold exceeded 
notifications.
+     * Uses memoryThresholdFraction and collectionMemoryThresholdFraction to 
configure thresholds
+     * for heap sizes less than 1GB and unusedMemoryThreshold for bigger heaps.
+     *
+     * @param memoryThresholdFraction
+     *            fraction of biggest heap for which we want to get memory 
usage
+     *            threshold exceeded notifications
+     * @param collectionMemoryThresholdFraction
+     *            fraction of biggest heap for which we want to get collection
+     *            threshold exceeded notifications
+     * @param unusedMemoryThreshold
+     *            Unused memory size below which we want to get notifications
+     */
+    private void configureMemoryThresholds(double memoryThresholdFraction, 
double collectionMemoryThresholdFraction, long unusedMemoryThreshold) {
+        long tenuredHeapSize = tenuredHeap.getUsage().getMax();
+        memoryThresholdSize = (long)(tenuredHeapSize * 
memoryThresholdFraction);
+        collectionThresholdSize = (long)(tenuredHeapSize * 
collectionMemoryThresholdFraction);
+        if (tenuredHeapSize > ONE_GB) {
+            // If heap is 1G which is most default we will be spilling around 
~700MB with 300MB still unused with default 0.7 threshold
+            // For bigger heaps, we still want to spill when there is 300MB 
unused (plus another 50MB for buffer) and not at 70%.
+            // For eg: For 4G we want to start spilling at 3.65GB and not at 
2.8GB(70%) for better use of memory
+            memoryThresholdSize = tenuredHeapSize - unusedMemoryThreshold;
+            collectionThresholdSize = tenuredHeapSize - unusedMemoryThreshold;
+        }
 
         // we want to set both collection and usage threshold alerts to be
         // safe. In some local tests after a point only collection threshold
@@ -133,31 +164,29 @@ public class SpillableMemoryManager impl
 
         /* We set the threshold to be 50% of tenured since that is where
          * the GC starts to dominate CPU time according to Sun doc */
-        tenuredHeap.setCollectionUsageThreshold((long)(tenuredHeapSize * 
collectionMemoryThresholdFraction));
+        
tenuredHeap.setCollectionUsageThreshold((long)(collectionThresholdSize));
         // we set a higher threshold for usage threshold exceeded notification
         // since this is more likely to be effective sooner and we do not
         // want to be spilling too soon
-        tenuredHeap.setUsageThreshold((long)(tenuredHeapSize * 
memoryThresholdFraction));
+
+        tenuredHeap.setUsageThreshold((long)(memoryThresholdSize));
+        log.info("Selected heap (" + tenuredHeap.getName() + ")" + " of size " 
+ tenuredHeapSize
+                + " to monitor. collectionUsageThreshold = " + 
tenuredHeap.getCollectionUsageThreshold()
+                + ", usageThreshold = " + tenuredHeap.getUsageThreshold() );
     }
 
     public static SpillableMemoryManager getInstance() {
         return manager;
     }
 
-    public static void configure(Properties properties) {
-
-        try {
+    public void configure(Configuration conf) {
 
-            spillFileSizeThreshold = Long.parseLong(
-                    properties.getProperty("pig.spill.size.threshold") ) ;
-
-            gcActivationSize = Long.parseLong(
-                    properties.getProperty("pig.spill.gc.activation.size") ) ;
-        }
-        catch (NumberFormatException  nfe) {
-            throw new RuntimeException("Error while converting system 
configurations" +
-                       "spill.size.threshold, spill.gc.activation.size", nfe) ;
-        }
+        spillFileSizeThreshold = conf.getLong("pig.spill.size.threshold", 
spillFileSizeThreshold);
+        gcActivationSize = conf.getLong("pig.spill.gc.activation.size", 
gcActivationSize);
+        double memoryThresholdFraction = 
conf.getDouble(PigConfiguration.PIG_SPILL_MEMORY_USAGE_THRESHOLD_FRACTION, 
MEMORY_THRESHOLD_FRACTION_DEFAULT);
+        double collectionThresholdFraction = 
conf.getDouble(PigConfiguration.PIG_SPILL_COLLECTION_THRESHOLD_FRACTION, 
COLLECTION_THRESHOLD_FRACTION_DEFAULT);
+        long unusedMemoryThreshold = 
conf.getLong(PigConfiguration.PIG_SPILL_UNUSED_MEMORY_THRESHOLD_SIZE, 
UNUSED_MEMORY_THRESHOLD_DEFAULT);
+        configureMemoryThresholds(memoryThresholdFraction, 
collectionThresholdFraction, unusedMemoryThreshold);
     }
 
     @Override
@@ -169,12 +198,11 @@ public class SpillableMemoryManager impl
         // used - heapmax/2 + heapmax/4
         long toFree = 0L;
         
if(n.getType().equals(MemoryNotificationInfo.MEMORY_THRESHOLD_EXCEEDED)) {
-            long threshold = (long)(info.getUsage().getMax() * 
memoryThresholdFraction);
-            toFree = info.getUsage().getUsed() - threshold + (long)(threshold 
* 0.5);
+            toFree = info.getUsage().getUsed() - collectionThresholdSize + 
(long)(collectionThresholdSize * 0.5);
 
             //log
             String msg = "memory handler call- Usage threshold "
-                + info.getUsage();
+                + info.getUsage() + ", toFree = " + toFree;
             if(!firstUsageThreshExceededLogged){
                 log.info("first " + msg);
                 firstUsageThreshExceededLogged = true;
@@ -182,12 +210,11 @@ public class SpillableMemoryManager impl
                 log.debug(msg);
             }
         } else { // MEMORY_COLLECTION_THRESHOLD_EXCEEDED CASE
-            long threshold = (long)(info.getUsage().getMax() * 
collectionMemoryThresholdFraction);
-            toFree = info.getUsage().getUsed() - threshold + (long)(threshold 
* 0.5);
+            toFree = info.getUsage().getUsed() - memoryThresholdSize + 
(long)(memoryThresholdSize * 0.5);
 
             //log
             String msg = "memory handler call - Collection threshold "
-                + info.getUsage();
+                + info.getUsage() + ", toFree = " + toFree;
             if(!firstCollectionThreshExceededLogged){
                 log.info("first " + msg);
                 firstCollectionThreshExceededLogged = true;


Reply via email to