Author: rohini
Date: Wed Mar 23 19:21:32 2016
New Revision: 1736376
URL: http://svn.apache.org/viewvc?rev=1736376&view=rev
Log:
PIG-4847: POPartialAgg processing and spill improvements (rohini)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/conf/pig.properties
pig/trunk/src/org/apache/pig/PigConfiguration.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java
Modified: pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1736376&r1=1736375&r2=1736376&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Mar 23 19:21:32 2016
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-4847: POPartialAgg processing and spill improvements (rohini)
+
PIG-4840: Do not turn off UnionOptimizer for unsupported storefuncs in case of
no vertex groups (rohini)
PIG-4843: Turn off combiner in reducer vertex for Tez if bags are in combine
plan (rohini)
Modified: pig/trunk/conf/pig.properties
URL:
http://svn.apache.org/viewvc/pig/trunk/conf/pig.properties?rev=1736376&r1=1736375&r2=1736376&view=diff
==============================================================================
--- pig/trunk/conf/pig.properties (original)
+++ pig/trunk/conf/pig.properties Wed Mar 23 19:21:32 2016
@@ -193,6 +193,20 @@
#
# pig.spill.gc.activation.size=40000000
+# For heaps of 1GB and less, SpillableMemoryManager spill will be triggered
+# if the fraction of biggest heap exceeds the usage threshold. Default is 0.7
+# pig.spill.memory.usage.threshold.fraction=0.7
+
+# For heaps of 1GB and less, SpillableMemoryManager spill will be triggered
+# if the fraction of big heap exceeds the collection threshold. Default is 0.7
+# pig.spill.collection.threshold.fraction=0.7
+
+# For heaps bigger than 1GB, we want to use a fixed size for collection and
+# usage thresholds to better utilize memory. SpillableMemoryManager spill will
be triggered
+# if the unused heap size falls below this threshold.
+# Default is 350 MB
+# pig.spill.unused.memory.threshold.size=367001600
+
# Maximum amount of data to replicate using the distributed cache when doing
# fragment-replicated join. (default: 1000000000, about 1GB) Consider
increasing
# this in a production environment, but carefully.
Modified: pig/trunk/src/org/apache/pig/PigConfiguration.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigConfiguration.java?rev=1736376&r1=1736375&r2=1736376&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigConfiguration.java (original)
+++ pig/trunk/src/org/apache/pig/PigConfiguration.java Wed Mar 23 19:21:32 2016
@@ -387,6 +387,24 @@ public class PigConfiguration {
public static final String PIG_TEZ_DAG_STATUS_REPORT_INTERVAL =
"pig.tez.dag.status.report.interval";
+ // SpillableMemoryManager settings
+
+ /**
+ * SpillableMemoryManager spill will be triggered if the fraction of
biggest heap exceeds the usage threshold
+ * Default is 0.7
+ */
+ public static final String PIG_SPILL_MEMORY_USAGE_THRESHOLD_FRACTION =
"pig.spill.memory.usage.threshold.fraction";
+
+ /**
+ * SpillableMemoryManager spill will be triggered if the fraction of
biggest heap exceeds the collection threshold
+ * Default is 0.7
+ */
+ public static final String PIG_SPILL_COLLECTION_THRESHOLD_FRACTION =
"pig.spill.collection.threshold.fraction";
+
+ /**
+ * SpillableMemoryManager spill will be triggered when unused memory falls
below the threshold.
+ */
+ public static final String PIG_SPILL_UNUSED_MEMORY_THRESHOLD_SIZE =
"pig.spill.unused.memory.threshold.size";
// Deprecated settings of Pig 0.13
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java?rev=1736376&r1=1736375&r2=1736376&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java
Wed Mar 23 19:21:32 2016
@@ -33,7 +33,6 @@ import org.apache.log4j.PropertyConfigur
import org.apache.pig.PigConstants;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
@@ -161,7 +160,7 @@ public abstract class PigGenericMapBase
super.setup(context);
Configuration job = context.getConfiguration();
- SpillableMemoryManager.configure(ConfigurationUtil.toProperties(job));
+ SpillableMemoryManager.getInstance().configure(job);
context.getConfiguration().set(PigConstants.TASK_INDEX,
Integer.toString(context.getTaskAttemptID().getTaskID().getId()));
PigMapReduce.sJobContext = context;
PigMapReduce.sJobConfInternal.set(context.getConfiguration());
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java?rev=1736376&r1=1736375&r2=1736376&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
Wed Mar 23 19:21:32 2016
@@ -34,7 +34,6 @@ import org.apache.pig.PigConstants;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.HDataType;
-import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
@@ -313,7 +312,7 @@ public class PigGenericMapReduce {
if (inIllustrator)
pack = getPack(context);
Configuration jConf = context.getConfiguration();
-
SpillableMemoryManager.configure(ConfigurationUtil.toProperties(jConf));
+ SpillableMemoryManager.getInstance().configure(jConf);
context.getConfiguration().set(PigConstants.TASK_INDEX,
Integer.toString(context.getTaskAttemptID().getTaskID().getId()));
sJobContext = context;
sJobConfInternal.set(context.getConfiguration());
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java?rev=1736376&r1=1736375&r2=1736376&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
Wed Mar 23 19:21:32 2016
@@ -141,7 +141,7 @@ public class POPartialAgg extends Physic
}
private void init() throws ExecException {
- ALL_POPARTS.put(this, null);
+
numRecsInRawMap = 0;
numRecsInProcessedMap = 0;
minOutputReduction = DEFAULT_MIN_REDUCTION;
@@ -172,6 +172,9 @@ public class POPartialAgg extends Physic
// Set them to true instead of adding another check for
!disableMapAgg
sizeReductionChecked = true;
estimatedMemThresholds = true;
+ } else {
+ ALL_POPARTS.put(this, null);
+ SpillableMemoryManager.getInstance().registerSpillable(this);
}
// Avoid hashmap resizing. TODO: Investigate loadfactor of 0.90 or 1.0
// newHashMapWithExpectedSize does new HashMap(expectedSize +
expectedSize/3)
@@ -187,7 +190,6 @@ public class POPartialAgg extends Physic
listSizeThreshold = Math.min(numRecordsToSample, MAX_LIST_SIZE);
}
initialized = true;
- SpillableMemoryManager.getInstance().registerSpillable(this);
}
@Override
@@ -203,7 +205,7 @@ public class POPartialAgg extends Physic
// The fact that we are in the latter stage is communicated via the
doSpill
// flag.
- if (!initialized && !ALL_POPARTS.containsKey(this)) {
+ if (!initialized) {
init();
}
@@ -224,8 +226,12 @@ public class POPartialAgg extends Physic
if (doSpill == false) {
// SpillableMemoryManager requested a spill to reduce
memory
// consumption. See if we can avoid it.
+ // If we see that there are way more records in processed
map than
+ // raw map, it is better to spill after the aggregation
+ boolean spillProcessedMap = (3 * numRecsInRawMap) <
numRecsInProcessedMap;
aggregateBothLevels(false, false);
- if (shouldSpill()) {
+ // Spill if the numRecsInProcessedMap
+ if (spillProcessedMap || shouldSpill()) {
startSpill(false);
} else {
LOG.info("Avoided emitting records during spill memory
call.");
@@ -360,6 +366,7 @@ public class POPartialAgg extends Physic
// called and size reduction checked
startSpill(false);
disableMapAgg = true;
+ ALL_POPARTS.remove(this);
}
private boolean mapAggDisabled() {
@@ -379,19 +386,21 @@ public class POPartialAgg extends Physic
return shouldAggregateSecondLevel();
}
+ private void addKeySingleValToMap(Map<Object, List<Tuple>> map,
+ Object key, List<Tuple> inpTuple) throws ExecException {
+ List<Tuple> value = map.get(key);
+ if (value == null) {
+ map.put(key, inpTuple);
+ } else {
+ value.add(inpTuple.get(0));
+ }
+ }
+
private void addKeyValToMap(Map<Object, List<Tuple>> map,
Object key, Tuple inpTuple) throws ExecException {
List<Tuple> value = map.get(key);
if (value == null) {
- if (isGroupAll) {
- // Set exact array initial size to avoid array copies
- // listSizeThreshold = numRecordsToSample before estimating
memory
- // thresholds and firstTierThreshold after memory estimation
- int listSize = (map == rawInputMap) ? listSizeThreshold :
Math.min(secondTierThreshold, MAX_LIST_SIZE);
- value = new ArrayList<Tuple>(listSize);
- } else {
- value = new ArrayList<Tuple>();
- }
+ value = createNewValueList(map);
map.put(key, value);
}
value.add(inpTuple);
@@ -409,6 +418,20 @@ public class POPartialAgg extends Physic
}
}
+ private List<Tuple> createNewValueList(Map<Object, List<Tuple>> map) {
+ List<Tuple> value;
+ if (isGroupAll) {
+ // Set exact array initial size to avoid array copies
+ // listSizeThreshold = numRecordsToSample before estimating memory
+ // thresholds and firstTierThreshold after memory estimation
+ int listSize = (map == rawInputMap) ? listSizeThreshold :
Math.min(secondTierThreshold + 1, MAX_LIST_SIZE);
+ value = new ArrayList<Tuple>(listSize);
+ } else {
+ value = new ArrayList<Tuple>();
+ }
+ return value;
+ }
+
private void startSpill(boolean aggregate) throws ExecException {
// If spillingIterator is null, we are already spilling and don't need
to set up.
if (spillingIterator != null) return;
@@ -452,13 +475,38 @@ public class POPartialAgg extends Physic
* @throws ExecException
*/
private int aggregate(Map<Object, List<Tuple>> fromMap, Map<Object,
List<Tuple>> toMap, int numEntriesInTarget) throws ExecException {
+ boolean srcDestDifferent = (fromMap == toMap) ? false : true;
Iterator<Map.Entry<Object, List<Tuple>>> iter =
fromMap.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<Object, List<Tuple>> entry = iter.next();
- Tuple valueTuple = createValueTuple(entry.getKey(),
entry.getValue());
- Result res = getOutput(entry.getKey(), valueTuple);
- iter.remove();
- addKeyValToMap(toMap, entry.getKey(),
getAggResultTuple(res.result));
+ if (entry.getValue().size() == 1) {
+ // If fromMap and toMap are same (processedInputMap), then
continue without any change
+ // If different (rawInputMap->processedInputMap), add directly
to target and skip running through the valuePlans
+ if (srcDestDifferent) {
+ iter.remove();
+ addKeySingleValToMap(toMap, entry.getKey(),
entry.getValue());
+ }
+ } else {
+ Tuple valueTuple = createValueTuple(entry.getKey(),
entry.getValue());
+ Result res = getOutput(entry.getKey(), valueTuple);
+ if (srcDestDifferent) {
+ // Remove from src and add to destination
(rawInputMap->processedInputMap)
+ iter.remove();
+ addKeyValToMap(toMap, entry.getKey(),
getAggResultTuple(res.result));
+ } else {
+ // Update processedInputMap in place
+ List<Tuple> value = entry.getValue();
+ if (isGroupAll) {
+ value.clear();
+ } else {
+ // Creating a new list to free more space instead of
+ // calling clear as the same key might not repeat again
+ value = createNewValueList(toMap);
+ toMap.put(entry.getKey(), value);
+ }
+ value.add(getAggResultTuple(res.result));
+ }
+ }
numEntriesInTarget++;
}
return numEntriesInTarget;
@@ -495,9 +543,7 @@ public class POPartialAgg extends Physic
return;
}
int processedTuples = numRecsInProcessedMap;
- Map<Object, List<Tuple>> newMap =
Maps.newHashMapWithExpectedSize(processedInputMap.size());
- numRecsInProcessedMap = aggregate(processedInputMap, newMap, 0);
- processedInputMap = newMap;
+ numRecsInProcessedMap = aggregate(processedInputMap,
processedInputMap, 0);
LOG.info("Aggregated " + processedTuples + " processed tuples to " +
numRecsInProcessedMap + " tuples");
}
@@ -643,14 +689,13 @@ public class POPartialAgg extends Physic
LOG.info("Spill triggered by SpillableMemoryManager, but
previous spill call is still not processed. Skipping");
return 0;
}
- LOG.info("Spill triggered by SpillableMemoryManager");
synchronized(spillLock) {
if (rawInputMap != null) {
- LOG.info("Memory usage: " + getMemorySize()
- + ". Raw map: num keys = " + rawInputMap.size()
- + ", num tuples = "+ numRecsInRawMap
- + ", Processed map: num keys = " +
processedInputMap.size()
- + ", num tuples = "+ numRecsInProcessedMap );
+ LOG.info("Spill triggered. Memory usage: " +
getMemorySize()
+ + ". Raw map: keys = " + rawInputMap.size()
+ + ", tuples = "+ numRecsInRawMap
+ + ", Processed map: keys = " +
processedInputMap.size()
+ + ", tuples = "+ numRecsInProcessedMap );
}
startedContingentSpill = false;
doContingentSpill = true;
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java?rev=1736376&r1=1736375&r2=1736376&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
Wed Mar 23 19:21:32 2016
@@ -56,6 +56,7 @@ import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.plan.DependencyOrderWalker;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.SpillableMemoryManager;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.impl.util.Utils;
import org.apache.pig.tools.pigstats.PigStatusReporter;
@@ -128,6 +129,7 @@ public class PigProcessor extends Abstra
UserPayload payload = getContext().getUserPayload();
conf = TezUtils.createConfFromUserPayload(payload);
+ SpillableMemoryManager.getInstance().configure(conf);
PigContext.setPackageImportList((ArrayList<String>) ObjectSerializer
.deserialize(conf.get("udf.import.list")));
PigContext pc = (PigContext)
ObjectSerializer.deserialize(conf.get("pig.pigContext"));
Modified: pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java?rev=1736376&r1=1736375&r2=1736376&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java
(original)
+++ pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java Wed Mar
23 19:21:32 2016
@@ -27,7 +27,6 @@ import java.util.Comparator;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
-import java.util.Properties;
import javax.management.Notification;
import javax.management.NotificationEmitter;
@@ -36,6 +35,8 @@ import javax.management.openmbean.Compos
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.PigConfiguration;
/**
* This class Tracks the tenured pool and a list of Spillable objects. When
memory gets low, this
@@ -50,6 +51,11 @@ public class SpillableMemoryManager impl
private static final Log log =
LogFactory.getLog(SpillableMemoryManager.class);
+ private static final int ONE_GB = 1024 * 1024 * 1024;
+ private static final int UNUSED_MEMORY_THRESHOLD_DEFAULT = 350 * 1024 *
1024;
+ private static final double MEMORY_THRESHOLD_FRACTION_DEFAULT = 0.7;
+ private static final double COLLECTION_THRESHOLD_FRACTION_DEFAULT = 0.7;
+
private LinkedList<WeakReference<Spillable>> spillables = new
LinkedList<WeakReference<Spillable>>();
// References to spillables with size
private LinkedList<SpillablePtr> spillablesSR = null;
@@ -68,13 +74,9 @@ public class SpillableMemoryManager impl
// and between GC invocations
private long accumulatedFreeSize = 0L;
- // fraction of biggest heap for which we want to get
- // "memory usage threshold exceeded" notifications
- private double memoryThresholdFraction = 0.7;
-
- // fraction of biggest heap for which we want to get
- // "collection threshold exceeded" notifications
- private double collectionMemoryThresholdFraction = 0.5;
+ private long memoryThresholdSize = 0L;
+
+ private long collectionThresholdSize = 0L;
// log notification on usage threshold exceeded only the first time
private boolean firstUsageThreshExceededLogged = false;
@@ -89,6 +91,8 @@ public class SpillableMemoryManager impl
private volatile boolean blockRegisterOnSpill = false;
+ private MemoryPoolMXBean tenuredHeap;
+
private static final SpillableMemoryManager manager = new
SpillableMemoryManager();
//@StaticDataCleanup
@@ -100,8 +104,6 @@ public class SpillableMemoryManager impl
private SpillableMemoryManager() {
((NotificationEmitter)ManagementFactory.getMemoryMXBean()).addNotificationListener(this,
null, null);
List<MemoryPoolMXBean> mpbeans =
ManagementFactory.getMemoryPoolMXBeans();
- MemoryPoolMXBean tenuredHeap = null;
- long tenuredHeapSize = 0;
long totalSize = 0;
for (MemoryPoolMXBean pool : mpbeans) {
log.debug("Found heap (" + pool.getName() + ") of type " +
pool.getType());
@@ -111,7 +113,6 @@ public class SpillableMemoryManager impl
// CMS Old Gen or "tenured" is the only heap that supports
// setting usage threshold.
if (pool.isUsageThresholdSupported()) {
- tenuredHeapSize = size;
tenuredHeap = pool;
}
}
@@ -120,8 +121,38 @@ public class SpillableMemoryManager impl
if (tenuredHeap == null) {
throw new RuntimeException("Couldn't find heap");
}
- log.debug("Selected heap to monitor (" +
- tenuredHeap.getName() + ")");
+
+ configureMemoryThresholds(MEMORY_THRESHOLD_FRACTION_DEFAULT,
+ COLLECTION_THRESHOLD_FRACTION_DEFAULT,
+ UNUSED_MEMORY_THRESHOLD_DEFAULT);
+
+ }
+
+ /**
+ * Configure thresholds for memory usage/collection threshold exceeded
notifications.
+ * Uses memoryThresholdFraction and collectionMemoryThresholdFraction to
configure thresholds
+ * for heap sizes less than 1GB and unusedMemoryThreshold for bigger heaps.
+ *
+ * @param memoryThresholdFraction
+ * fraction of biggest heap for which we want to get memory
usage
+ * threshold exceeded notifications
+ * @param collectionMemoryThresholdFraction
+ * fraction of biggest heap for which we want to get collection
+ * threshold exceeded notifications
+ * @param unusedMemoryThreshold
+ * Unused memory size below which we want to get notifications
+ */
+ private void configureMemoryThresholds(double memoryThresholdFraction,
double collectionMemoryThresholdFraction, long unusedMemoryThreshold) {
+ long tenuredHeapSize = tenuredHeap.getUsage().getMax();
+ memoryThresholdSize = (long)(tenuredHeapSize *
memoryThresholdFraction);
+ collectionThresholdSize = (long)(tenuredHeapSize *
collectionMemoryThresholdFraction);
+ if (tenuredHeapSize > ONE_GB) {
+ // If heap is 1G which is most default we will be spilling around
~700MB with 300MB still unused with default 0.7 threshold
+ // For bigger heaps, we still want to spill when there is 300MB
unused (plus another 50MB for buffer) and not at 70%.
+ // For eg: For 4G we want to start spilling at 3.65GB and not at
2.8GB(70%) for better use of memory
+ memoryThresholdSize = tenuredHeapSize - unusedMemoryThreshold;
+ collectionThresholdSize = tenuredHeapSize - unusedMemoryThreshold;
+ }
// we want to set both collection and usage threshold alerts to be
// safe. In some local tests after a point only collection threshold
@@ -133,31 +164,29 @@ public class SpillableMemoryManager impl
/* We set the threshold to be 50% of tenured since that is where
* the GC starts to dominate CPU time according to Sun doc */
- tenuredHeap.setCollectionUsageThreshold((long)(tenuredHeapSize *
collectionMemoryThresholdFraction));
+
tenuredHeap.setCollectionUsageThreshold((long)(collectionThresholdSize));
// we set a higher threshold for usage threshold exceeded notification
// since this is more likely to be effective sooner and we do not
// want to be spilling too soon
- tenuredHeap.setUsageThreshold((long)(tenuredHeapSize *
memoryThresholdFraction));
+
+ tenuredHeap.setUsageThreshold((long)(memoryThresholdSize));
+ log.info("Selected heap (" + tenuredHeap.getName() + ")" + " of size "
+ tenuredHeapSize
+ + " to monitor. collectionUsageThreshold = " +
tenuredHeap.getCollectionUsageThreshold()
+ + ", usageThreshold = " + tenuredHeap.getUsageThreshold() );
}
public static SpillableMemoryManager getInstance() {
return manager;
}
- public static void configure(Properties properties) {
-
- try {
+ public void configure(Configuration conf) {
- spillFileSizeThreshold = Long.parseLong(
- properties.getProperty("pig.spill.size.threshold") ) ;
-
- gcActivationSize = Long.parseLong(
- properties.getProperty("pig.spill.gc.activation.size") ) ;
- }
- catch (NumberFormatException nfe) {
- throw new RuntimeException("Error while converting system
configurations" +
- "spill.size.threshold, spill.gc.activation.size", nfe) ;
- }
+ spillFileSizeThreshold = conf.getLong("pig.spill.size.threshold",
spillFileSizeThreshold);
+ gcActivationSize = conf.getLong("pig.spill.gc.activation.size",
gcActivationSize);
+ double memoryThresholdFraction =
conf.getDouble(PigConfiguration.PIG_SPILL_MEMORY_USAGE_THRESHOLD_FRACTION,
MEMORY_THRESHOLD_FRACTION_DEFAULT);
+ double collectionThresholdFraction =
conf.getDouble(PigConfiguration.PIG_SPILL_COLLECTION_THRESHOLD_FRACTION,
COLLECTION_THRESHOLD_FRACTION_DEFAULT);
+ long unusedMemoryThreshold =
conf.getLong(PigConfiguration.PIG_SPILL_UNUSED_MEMORY_THRESHOLD_SIZE,
UNUSED_MEMORY_THRESHOLD_DEFAULT);
+ configureMemoryThresholds(memoryThresholdFraction,
collectionThresholdFraction, unusedMemoryThreshold);
}
@Override
@@ -169,12 +198,11 @@ public class SpillableMemoryManager impl
// used - heapmax/2 + heapmax/4
long toFree = 0L;
if(n.getType().equals(MemoryNotificationInfo.MEMORY_THRESHOLD_EXCEEDED)) {
- long threshold = (long)(info.getUsage().getMax() *
memoryThresholdFraction);
- toFree = info.getUsage().getUsed() - threshold + (long)(threshold
* 0.5);
+ toFree = info.getUsage().getUsed() - collectionThresholdSize +
(long)(collectionThresholdSize * 0.5);
//log
String msg = "memory handler call- Usage threshold "
- + info.getUsage();
+ + info.getUsage() + ", toFree = " + toFree;
if(!firstUsageThreshExceededLogged){
log.info("first " + msg);
firstUsageThreshExceededLogged = true;
@@ -182,12 +210,11 @@ public class SpillableMemoryManager impl
log.debug(msg);
}
} else { // MEMORY_COLLECTION_THRESHOLD_EXCEEDED CASE
- long threshold = (long)(info.getUsage().getMax() *
collectionMemoryThresholdFraction);
- toFree = info.getUsage().getUsed() - threshold + (long)(threshold
* 0.5);
+ toFree = info.getUsage().getUsed() - memoryThresholdSize +
(long)(memoryThresholdSize * 0.5);
//log
String msg = "memory handler call - Collection threshold "
- + info.getUsage();
+ + info.getUsage() + ", toFree = " + toFree;
if(!firstCollectionThreshExceededLogged){
log.info("first " + msg);
firstCollectionThreshExceededLogged = true;