Repository: camel Updated Branches: refs/heads/master 47c64ec9c -> 7eef37262
CAMEL-9056: Aggregator - Allow to clear closed correlation key cache Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/7eef3726 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/7eef3726 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/7eef3726 Branch: refs/heads/master Commit: 7eef37262ada1824adaa647bc3c2aadf139e66e3 Parents: 47c64ec Author: Claus Ibsen <davscl...@apache.org> Authored: Wed Aug 5 15:51:58 2015 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Wed Aug 5 15:51:58 2015 +0200 ---------------------------------------------------------------------- .../mbean/ManagedAggregateProcessorMBean.java | 8 +++++++- .../mbean/ManagedAggregateProcessor.java | 9 ++++++++- .../processor/aggregate/AggregateProcessor.java | 20 ++++++++++++++++++++ 3 files changed, 35 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/7eef3726/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java index 845d6c7..07c1d21 100644 --- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java +++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java @@ -54,7 +54,7 @@ public interface ManagedAggregateProcessorMBean extends ManagedProcessorMBean { @ManagedAttribute(description = "Ignore invalid correlation keys") boolean isIgnoreInvalidCorrelationKeys(); - @ManagedAttribute(description = "Whether to close the correlation group on completion") + @ManagedAttribute(description = "Whether to close the correlation group on completion if this value is > 0.") Integer getCloseCorrelationKeyOnCompletion(); @ManagedAttribute(description = "Parallel mode") @@ -90,6 +90,12 @@ public interface ManagedAggregateProcessorMBean extends ManagedProcessorMBean { @ManagedOperation(description = "To force complete of all groups") int forceCompletionOfAllGroups(); + @ManagedAttribute(description = "Current number of closed correlation keys in the memory cache") + int getClosedCorrelationKeysCacheSize(); + + @ManagedOperation(description = "Clear all the closed correlation keys stored in the cache") + void clearClosedCorrelationKeysCache(); + @ManagedAttribute(description = "Total number of exchanges arrived into the aggregator") long getTotalIn(); http://git-wip-us.apache.org/repos/asf/camel/blob/7eef3726/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java index 2531332..a00c63e 100644 --- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java @@ -22,7 +22,6 @@ import org.apache.camel.CamelContext; import org.apache.camel.api.management.ManagedResource; import org.apache.camel.api.management.mbean.ManagedAggregateProcessorMBean; import org.apache.camel.model.AggregateDefinition; -import org.apache.camel.model.ProcessorDefinition; import org.apache.camel.processor.aggregate.AggregateProcessor; import org.apache.camel.spi.ManagementStrategy; @@ -189,6 +188,14 @@ public class ManagedAggregateProcessor extends ManagedProcessor implements Manag } } + public int getClosedCorrelationKeysCacheSize() { + return processor.getClosedCorrelationKeysCacheSize(); + } + + public void clearClosedCorrelationKeysCache() { + processor.clearClosedCorrelationKeysCache(); + } + public long getTotalIn() { return processor.getStatistics().getTotalIn(); } http://git-wip-us.apache.org/repos/asf/camel/blob/7eef3726/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java index d08cfd9..4e0dbca 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java @@ -783,6 +783,26 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor timeoutMap.put(key, exchange.getExchangeId(), timeout); } + /** + * Current number of closed correlation keys in the memory cache + */ + public int getClosedCorrelationKeysCacheSize() { + if (closedCorrelationKeys != null) { + return closedCorrelationKeys.size(); + } else { + return 0; + } + } + + /** + * Clear all the closed correlation keys stored in the cache + */ + public void clearClosedCorrelationKeysCache() { + if (closedCorrelationKeys != null) { + closedCorrelationKeys.clear(); + } + } + public AggregateProcessorStatistics getStatistics() { return statistics; }