Updated Branches: refs/heads/master a7b103d92 -> b96c92e56
CAMEL-6476: Added option for configure if heap memeory limit is committed or max. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b96c92e5 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b96c92e5 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b96c92e5 Branch: refs/heads/master Commit: b96c92e562e049d4f5b27daa52c78b007fe014e5 Parents: a7b103d Author: Claus Ibsen <[email protected]> Authored: Mon Aug 12 16:01:00 2013 +0200 Committer: Claus Ibsen <[email protected]> Committed: Mon Aug 12 16:01:00 2013 +0200 ---------------------------------------------------------------------- .../ManagedStreamCachingStrategyMBean.java | 7 +++++ .../impl/DefaultStreamCachingStrategy.java | 29 ++++++++++++++++---- .../mbean/ManagedStreamCachingStrategy.java | 8 ++++++ .../apache/camel/spi/StreamCachingStrategy.java | 17 ++++++++++++ .../xml/AbstractCamelContextFactoryBean.java | 6 +++- .../CamelStreamCachingStrategyDefinition.java | 11 ++++++++ 6 files changed, 71 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/b96c92e5/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedStreamCachingStrategyMBean.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedStreamCachingStrategyMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedStreamCachingStrategyMBean.java index a576f81..5f6f9e3 100644 --- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedStreamCachingStrategyMBean.java +++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedStreamCachingStrategyMBean.java @@ -18,6 +18,7 @@ package org.apache.camel.api.management.mbean; import org.apache.camel.api.management.ManagedAttribute; import org.apache.camel.api.management.ManagedOperation; +import org.apache.camel.spi.StreamCachingStrategy; public interface ManagedStreamCachingStrategyMBean { @@ -42,6 +43,12 @@ public interface ManagedStreamCachingStrategyMBean { @ManagedAttribute(description = "Percentage (1-99) of used heap memory threshold to activate spooling to disk") int getSpoolUsedHeapMemoryThreshold(); + @ManagedAttribute(description = "Whether used heap memory limit is committed or maximum") + void setSpoolUsedHeapMemoryLimit(StreamCachingStrategy.SpoolUsedHeapMemoryLimit limit); + + @ManagedAttribute(description = "Whether used heap memory limit is committed or maximum") + StreamCachingStrategy.SpoolUsedHeapMemoryLimit getSpoolUsedHeapMemoryLimit(); + @ManagedAttribute(description = "Buffer size in bytes to use when coping between buffers") void setBufferSize(int bufferSize); http://git-wip-us.apache.org/repos/asf/camel/blob/b96c92e5/camel-core/src/main/java/org/apache/camel/impl/DefaultStreamCachingStrategy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultStreamCachingStrategy.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultStreamCachingStrategy.java index ed2b767..e119a49 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/DefaultStreamCachingStrategy.java +++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultStreamCachingStrategy.java @@ -56,6 +56,7 @@ public class DefaultStreamCachingStrategy extends org.apache.camel.support.Servi private transient String spoolDirectoryName = "${java.io.tmpdir}camel-tmp-#uuid#"; private long spoolThreshold = StreamCache.DEFAULT_SPOOL_THRESHOLD; private int spoolUsedHeapMemoryThreshold; + private SpoolUsedHeapMemoryLimit spoolUsedHeapMemoryLimit; private String spoolChiper; private int bufferSize = IOHelper.DEFAULT_BUFFER_SIZE; private boolean removeSpoolDirectoryWhenStopping = true; @@ -103,6 +104,14 @@ public class DefaultStreamCachingStrategy extends org.apache.camel.support.Servi this.spoolUsedHeapMemoryThreshold = spoolHeapMemoryWatermarkThreshold; } + public SpoolUsedHeapMemoryLimit getSpoolUsedHeapMemoryLimit() { + return spoolUsedHeapMemoryLimit; + } + + public void setSpoolUsedHeapMemoryLimit(SpoolUsedHeapMemoryLimit spoolUsedHeapMemoryLimit) { + this.spoolUsedHeapMemoryLimit = spoolUsedHeapMemoryLimit; + } + public void setSpoolThreshold(long spoolThreshold) { this.spoolThreshold = spoolThreshold; } @@ -291,7 +300,11 @@ public class DefaultStreamCachingStrategy extends org.apache.camel.support.Servi spoolRules.add(new FixedThresholdSpoolRule()); } if (spoolUsedHeapMemoryThreshold > 0) { - spoolRules.add(new UsedHeapMemorySpoolRule()); + if (spoolUsedHeapMemoryLimit == null) { + // use max by default + spoolUsedHeapMemoryLimit = SpoolUsedHeapMemoryLimit.Max; + } + spoolRules.add(new UsedHeapMemorySpoolRule(spoolUsedHeapMemoryLimit)); } } @@ -351,8 +364,10 @@ public class DefaultStreamCachingStrategy extends org.apache.camel.support.Servi private final class UsedHeapMemorySpoolRule implements SpoolRule { private final MemoryMXBean heapUsage; + private final SpoolUsedHeapMemoryLimit limit; - private UsedHeapMemorySpoolRule() { + private UsedHeapMemorySpoolRule(SpoolUsedHeapMemoryLimit limit) { + this.limit = limit; this.heapUsage = ManagementFactory.getMemoryMXBean(); } @@ -360,14 +375,16 @@ public class DefaultStreamCachingStrategy extends org.apache.camel.support.Servi if (spoolUsedHeapMemoryThreshold > 0) { // must use double to calculate with decimals for the percentage double used = heapUsage.getHeapMemoryUsage().getUsed(); - double committed = heapUsage.getHeapMemoryUsage().getCommitted(); - double calc = (used / committed) * 100; + double upper = limit == SpoolUsedHeapMemoryLimit.Committed ? + heapUsage.getHeapMemoryUsage().getCommitted() : heapUsage.getHeapMemoryUsage().getMax(); + double calc = (used / upper) * 100; int percentage = (int) calc; if (LOG.isTraceEnabled()) { long u = heapUsage.getHeapMemoryUsage().getUsed(); long c = heapUsage.getHeapMemoryUsage().getCommitted(); - LOG.trace("Heap memory: [used={}M ({}%), committed={}M]", new Object[]{u >> 20, percentage, c >> 20}); + long m = heapUsage.getHeapMemoryUsage().getMax(); + LOG.trace("Heap memory: [used={}M ({}%), committed={}M, max={}M]", new Object[]{u >> 20, percentage, c >> 20, m >> 20}); } if (percentage > spoolUsedHeapMemoryThreshold) { @@ -379,7 +396,7 @@ public class DefaultStreamCachingStrategy extends org.apache.camel.support.Servi } public String toString() { - return "Spool > " + spoolUsedHeapMemoryThreshold + "% used heap memory"; + return "Spool > " + spoolUsedHeapMemoryThreshold + "% used of " + limit + " heap memory"; } } http://git-wip-us.apache.org/repos/asf/camel/blob/b96c92e5/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedStreamCachingStrategy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedStreamCachingStrategy.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedStreamCachingStrategy.java index b57a010..284ac52 100644 --- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedStreamCachingStrategy.java +++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedStreamCachingStrategy.java @@ -74,6 +74,14 @@ public class ManagedStreamCachingStrategy extends ManagedService implements Mana return streamCachingStrategy.getSpoolUsedHeapMemoryThreshold(); } + public void setSpoolUsedHeapMemoryLimit(StreamCachingStrategy.SpoolUsedHeapMemoryLimit limit) { + streamCachingStrategy.setSpoolUsedHeapMemoryLimit(limit); + } + + public StreamCachingStrategy.SpoolUsedHeapMemoryLimit getSpoolUsedHeapMemoryLimit() { + return streamCachingStrategy.getSpoolUsedHeapMemoryLimit(); + } + public void setBufferSize(int bufferSize) { streamCachingStrategy.setBufferSize(bufferSize); } http://git-wip-us.apache.org/repos/asf/camel/blob/b96c92e5/camel-core/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java b/camel-core/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java index e6d851c..80ee1de 100644 --- a/camel-core/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java +++ b/camel-core/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java @@ -81,6 +81,13 @@ public interface StreamCachingStrategy extends StaticService { } /** + * Used for selecting if the memory limit is <tt>committed</tt> or <tt>maximum</tt> heap memory setting. + */ + enum SpoolUsedHeapMemoryLimit { + Committed, Max + } + + /** * Rule for determine if stream caching should be spooled to disk or kept in-memory. */ interface SpoolRule { @@ -139,6 +146,16 @@ public interface StreamCachingStrategy extends StaticService { int getSpoolUsedHeapMemoryThreshold(); /** + * Sets what the upper bounds should be when {@link #setSpoolUsedHeapMemoryThreshold(int)} + * is in use. + * + * @param bounds the bounds + */ + void setSpoolUsedHeapMemoryLimit(SpoolUsedHeapMemoryLimit bounds); + + SpoolUsedHeapMemoryLimit getSpoolUsedHeapMemoryLimit(); + + /** * Sets the buffer size to use when allocating in-memory buffers used for in-memory stream caches. * <p/> * The default size is {@link org.apache.camel.util.IOHelper#DEFAULT_BUFFER_SIZE} http://git-wip-us.apache.org/repos/asf/camel/blob/b96c92e5/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java ---------------------------------------------------------------------- diff --git a/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java b/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java index 76b0c69..f158d63 100644 --- a/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java +++ b/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java @@ -39,7 +39,6 @@ import org.apache.camel.component.properties.PropertiesComponent; import org.apache.camel.component.properties.PropertiesParser; import org.apache.camel.component.properties.PropertiesResolver; import org.apache.camel.management.DefaultManagementAgent; -import org.apache.camel.management.DefaultManagementLifecycleStrategy; import org.apache.camel.management.DefaultManagementStrategy; import org.apache.camel.management.ManagedManagementStrategy; import org.apache.camel.model.ContextScanDefinition; @@ -404,6 +403,11 @@ public abstract class AbstractCamelContextFactoryBean<T extends ModelCamelContex if (spoolUsedHeap != null) { getContext().getStreamCachingStrategy().setSpoolUsedHeapMemoryThreshold(spoolUsedHeap); } + String limit = CamelContextHelper.parseText(getContext(), streamCaching.getSpoolUsedHeapMemoryLimit()); + if (limit != null) { + StreamCachingStrategy.SpoolUsedHeapMemoryLimit ul = CamelContextHelper.mandatoryConvertTo(getContext(), StreamCachingStrategy.SpoolUsedHeapMemoryLimit.class, limit); + getContext().getStreamCachingStrategy().setSpoolUsedHeapMemoryLimit(ul); + } String spoolChiper = CamelContextHelper.parseText(getContext(), streamCaching.getSpoolChiper()); if (spoolChiper != null) { getContext().getStreamCachingStrategy().setSpoolChiper(spoolChiper); http://git-wip-us.apache.org/repos/asf/camel/blob/b96c92e5/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/CamelStreamCachingStrategyDefinition.java ---------------------------------------------------------------------- diff --git a/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/CamelStreamCachingStrategyDefinition.java b/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/CamelStreamCachingStrategyDefinition.java index 9a3ce52..d8f7781 100644 --- a/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/CamelStreamCachingStrategyDefinition.java +++ b/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/CamelStreamCachingStrategyDefinition.java @@ -48,6 +48,9 @@ public class CamelStreamCachingStrategyDefinition extends IdentifiedType { private String spoolUsedHeapMemoryThreshold; @XmlAttribute + private String spoolUsedHeapMemoryLimit; + + @XmlAttribute private String spoolRules; @XmlAttribute @@ -102,6 +105,14 @@ public class CamelStreamCachingStrategyDefinition extends IdentifiedType { this.spoolUsedHeapMemoryThreshold = spoolUsedHeapMemoryThreshold; } + public String getSpoolUsedHeapMemoryLimit() { + return spoolUsedHeapMemoryLimit; + } + + public void setSpoolUsedHeapMemoryLimit(String spoolUsedHeapMemoryLimit) { + this.spoolUsedHeapMemoryLimit = spoolUsedHeapMemoryLimit; + } + public String getSpoolRules() { return spoolRules; }
