Updated Branches: refs/heads/master 5fdb8b00d -> 6726d0429
New Infinispan component. It is still alpha, not supposed to be released yet Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/6726d042 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/6726d042 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/6726d042 Branch: refs/heads/master Commit: 6726d04298d26c07036b5b5ca87716768beff4ca Parents: 5fdb8b0 Author: Bilgin Ibryam <[email protected]> Authored: Wed Sep 11 12:29:32 2013 +0100 Committer: Bilgin Ibryam <[email protected]> Committed: Wed Sep 11 12:29:32 2013 +0100 ---------------------------------------------------------------------- components/camel-infinispan/pom.xml | 62 ++++++++++ .../InfinispanAsyncEventListener.java | 13 +++ .../infinispan/InfinispanComponent.java | 16 +++ .../infinispan/InfinispanConfiguration.java | 68 +++++++++++ .../infinispan/InfinispanConstants.java | 15 +++ .../infinispan/InfinispanConsumer.java | 65 +++++++++++ .../infinispan/InfinispanEndpoint.java | 35 ++++++ .../infinispan/InfinispanOperation.java | 74 ++++++++++++ .../infinispan/InfinispanProducer.java | 53 +++++++++ .../infinispan/InfinispanSyncEventListener.java | 45 ++++++++ .../InfinispanIdempotentRepository.java | 92 +++++++++++++++ .../org/apache/camel/component/infinispan | 1 + .../src/main/resources/log4j.properties | 16 +++ .../infinispan/InfinispanComponentTest.java | 48 ++++++++ .../infinispan/InfinispanConsumerTest.java | 41 +++++++ .../infinispan/InfinispanProducerTest.java | 115 +++++++++++++++++++ .../InfinispanRemoteProducerTest.java | 35 ++++++ .../infinispan/InfinispanSyncConsumerTest.java | 38 ++++++ .../infinispan/InfinispanTestSupport.java | 40 +++++++ ...finispanDefaultIdempotentRepositoryTest.java | 25 ++++ .../InfinispanIdempotentRepositoryTest.java | 74 ++++++++++++ components/pom.xml | 1 + parent/pom.xml | 1 + 23 files changed, 973 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/6726d042/components/camel-infinispan/pom.xml ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/pom.xml b/components/camel-infinispan/pom.xml new file mode 100644 index 0000000..fe9318e --- /dev/null +++ b/components/camel-infinispan/pom.xml @@ -0,0 +1,62 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.camel</groupId> + <artifactId>components</artifactId> + <version>2.13-SNAPSHOT</version> + </parent> + + <artifactId>camel-infinispan</artifactId> + <packaging>bundle</packaging> + <name>Camel-Infinispan Component</name> + <description>Camel Infinispan support</description> + + <properties> + <camel.osgi.export.pkg>org.apache.camel.component.infinispan.*</camel.osgi.export.pkg> + <camel.osgi.export.service>org.apache.camel.spi.ComponentResolver;component=infinispan</camel.osgi.export.service> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-core</artifactId> + </dependency> + <dependency> + <groupId>org.infinispan</groupId> + <artifactId>infinispan-core</artifactId> + <version>${infinispan-version}</version> + </dependency> + <dependency> + <groupId>org.infinispan</groupId> + <artifactId>infinispan-client-hotrod</artifactId> + <version>${infinispan-version}</version> + </dependency> + + <!-- testing --> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-test</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <version>${mockito-version}</version> + <scope>test</scope> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/camel/blob/6726d042/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanAsyncEventListener.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanAsyncEventListener.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanAsyncEventListener.java new file mode 100644 index 0000000..28a4786 --- /dev/null +++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanAsyncEventListener.java @@ -0,0 +1,13 @@ +package org.apache.camel.component.infinispan; + +import java.util.Set; + +import org.infinispan.notifications.Listener; + +@Listener(sync = false) +public class InfinispanAsyncEventListener extends InfinispanSyncEventListener { + + public InfinispanAsyncEventListener(InfinispanConsumer consumer, Set<String> eventTypes) { + super(consumer, eventTypes); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/6726d042/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanComponent.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanComponent.java new file mode 100644 index 0000000..b065d3d --- /dev/null +++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanComponent.java @@ -0,0 +1,16 @@ +package org.apache.camel.component.infinispan; + +import java.util.Map; + +import org.apache.camel.Endpoint; +import org.apache.camel.impl.DefaultComponent; + +public class InfinispanComponent extends DefaultComponent { + + protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { + InfinispanConfiguration configuration = new InfinispanConfiguration(); + configuration.setHost(remaining); + setProperties(configuration, parameters); + return new InfinispanEndpoint(uri, this, configuration); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/6726d042/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConfiguration.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConfiguration.java new file mode 100644 index 0000000..d6ed97f --- /dev/null +++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConfiguration.java @@ -0,0 +1,68 @@ +package org.apache.camel.component.infinispan; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + +import org.infinispan.commons.api.BasicCacheContainer; + +public class InfinispanConfiguration { + private BasicCacheContainer cacheContainer; + private String caseName; + private String host; + private String command; + private boolean sync = true; + private Set<String> eventTypes; + + public String getCommand() { + return command; + } + + public void setCommand(String command) { + this.command = command; + } + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public BasicCacheContainer getCacheContainer() { + return cacheContainer; + } + + public void setCacheContainer(BasicCacheContainer cacheContainer) { + this.cacheContainer = cacheContainer; + } + + public String getCasheName() { + return caseName; + } + + public void setCaseName(String caseName) { + this.caseName = caseName; + } + + public boolean isSync() { + return sync; + } + + public void setSync(boolean sync) { + this.sync = sync; + } + + public Set<String> getEventTypes() { + return eventTypes; + } + + public void setEventTypes(Set<String> eventTypes) { + this.eventTypes = eventTypes; + } + + public void setEventTypes(String eventTypes) { + this.eventTypes = new HashSet<String>(Arrays.asList(eventTypes.split(","))); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/6726d042/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java new file mode 100644 index 0000000..f8eb103 --- /dev/null +++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java @@ -0,0 +1,15 @@ +package org.apache.camel.component.infinispan; + +interface InfinispanConstants { + String EVENT_TYPE = "CamelInfinispanEventType"; + String IS_PRE = "CamelInfinispanIsPre"; + String CACHE_NAME = "CamelInfinispanCacheName"; + String KEY = "CamelInfinispanKey"; + String VALUE = "CamelInfinispanValue"; + String OPERATION = "CamelInfinispanOperation"; + String PUT = "CamelInfinispanOperationPut"; + String GET = "CamelInfinispanOperationGet"; + String REMOVE = "CamelInfinispanOperationRemove"; + String CLEAR = "CamelInfinispanOperationClear"; + String RESULT = "CamelInfinispanOperationResult"; +} http://git-wip-us.apache.org/repos/asf/camel/blob/6726d042/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConsumer.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConsumer.java new file mode 100644 index 0000000..ce4479b --- /dev/null +++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConsumer.java @@ -0,0 +1,65 @@ +package org.apache.camel.component.infinispan; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.impl.DefaultConsumer; +import org.infinispan.Cache; +import org.infinispan.manager.DefaultCacheManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class InfinispanConsumer extends DefaultConsumer { + private static final transient Logger LOGGER = LoggerFactory.getLogger(InfinispanProducer.class); + private final InfinispanConfiguration configuration; + private final InfinispanSyncEventListener listener; + private DefaultCacheManager defaultCacheManager; + + public InfinispanConsumer(InfinispanEndpoint endpoint, Processor processor, InfinispanConfiguration configuration) { + super(endpoint, processor); + this.configuration = configuration; + if (configuration.isSync()) { + listener = new InfinispanSyncEventListener(this, configuration.getEventTypes()); + } else { + listener = new InfinispanAsyncEventListener(this, configuration.getEventTypes()); + } + } + + public void processEvent(String eventType, boolean isPre, String cacheName, Object key) { + Exchange exchange = getEndpoint().createExchange(); + exchange.getOut().setHeader(InfinispanConstants.EVENT_TYPE, eventType); + exchange.getOut().setHeader(InfinispanConstants.IS_PRE, isPre); + exchange.getOut().setHeader(InfinispanConstants.CACHE_NAME, cacheName); + exchange.getOut().setHeader(InfinispanConstants.KEY, key); + + try { + getProcessor().process(exchange); + } catch (Exception e) { + LOGGER.error("Error processing event ", e); + } + } + + @Override + protected void doStart() throws Exception { + if (configuration.getCacheContainer() instanceof DefaultCacheManager) { + defaultCacheManager = (DefaultCacheManager) configuration.getCacheContainer(); + Cache<Object, Object> cache; + if (configuration.getCasheName() != null) { + cache = defaultCacheManager.getCache(configuration.getCasheName()); + } else { + cache = defaultCacheManager.getCache(); + } + cache.addListener(listener); + } else { + throw new UnsupportedOperationException("Consumer not support for CacheContainer: " + configuration.getCacheContainer()); + } + super.doStart(); + } + + @Override + protected void doStop() throws Exception { + if (defaultCacheManager != null) { + defaultCacheManager.removeListener(listener); + } + super.doStop(); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/6726d042/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanEndpoint.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanEndpoint.java new file mode 100644 index 0000000..bedeab9 --- /dev/null +++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanEndpoint.java @@ -0,0 +1,35 @@ +package org.apache.camel.component.infinispan; + +import org.apache.camel.Consumer; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.impl.DefaultEndpoint; + +public class InfinispanEndpoint extends DefaultEndpoint { + private InfinispanConfiguration configuration; + + public InfinispanEndpoint() { + } + + public InfinispanEndpoint(String endpointUri) { + super(endpointUri); + } + + public InfinispanEndpoint(String uri, InfinispanComponent component, InfinispanConfiguration configuration) { + super(uri, component); + this.configuration = configuration; + } + + public Producer createProducer() throws Exception { + return new InfinispanProducer(this, configuration); + } + + public Consumer createConsumer(Processor processor) throws Exception { + return new InfinispanConsumer(this, processor, configuration); + } + + public boolean isSingleton() { + return true; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/6726d042/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanOperation.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanOperation.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanOperation.java new file mode 100644 index 0000000..9aa688e --- /dev/null +++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanOperation.java @@ -0,0 +1,74 @@ +package org.apache.camel.component.infinispan; + +import org.apache.camel.Exchange; +import org.infinispan.commons.api.BasicCache; +import org.infinispan.commons.api.BasicCacheContainer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class InfinispanOperation { + private static final transient Logger LOGGER = LoggerFactory.getLogger(InfinispanOperation.class); + private BasicCache cache; + + public InfinispanOperation(BasicCache cache) { + this.cache = cache; + } + + public void process(Exchange exchange) { + Operation operation = getOperation(exchange); + operation.execute(cache, exchange); + } + + private Operation getOperation(Exchange exchange) { + String operation = exchange.getIn().getHeader(InfinispanConstants.OPERATION, String.class); + if (operation == null) { + operation = InfinispanConstants.PUT; + } + LOGGER.trace("Operation: [{}]", operation); + return Operation.valueOf(operation.substring(InfinispanConstants.OPERATION.length()).toUpperCase()); + } + + enum Operation { + PUT { + @Override + void execute(BasicCache cache, Exchange exchange) { + Object result = cache.put(getKey(exchange), getValue(exchange)); + setResult(result, exchange); + } + }, GET { + @Override + void execute(BasicCache cache, Exchange exchange) { + Object result = cache.get(getKey(exchange)); + setResult(result, exchange); + } + }, REMOVE { + @Override + void execute(BasicCache cache, Exchange exchange) { + Object result = cache.remove(getKey(exchange)); + setResult(result, exchange); + } + + + }, CLEAR { + @Override + void execute(BasicCache cache, Exchange exchange) { + cache.clear(); + } + }; + + void setResult(Object result, Exchange exchange) { + exchange.getIn().setHeader(InfinispanConstants.RESULT, result); + } + + Object getKey(Exchange exchange) { + return exchange.getIn().getHeader(InfinispanConstants.KEY); + } + + Object getValue(Exchange exchange) { + return exchange.getIn().getHeader(InfinispanConstants.VALUE); + } + + abstract void execute(BasicCache cache, Exchange exchange); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/6726d042/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanProducer.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanProducer.java new file mode 100644 index 0000000..de8b2e5d --- /dev/null +++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanProducer.java @@ -0,0 +1,53 @@ +package org.apache.camel.component.infinispan; + +import org.apache.camel.Exchange; +import org.apache.camel.impl.DefaultProducer; +import org.infinispan.commons.api.BasicCache; +import org.infinispan.commons.api.BasicCacheContainer; +import org.infinispan.client.hotrod.RemoteCacheManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class InfinispanProducer extends DefaultProducer { + private static final transient Logger LOGGER = LoggerFactory.getLogger(InfinispanProducer.class); + private InfinispanConfiguration configuration; + private BasicCacheContainer cacheContainer; + private boolean isManagedCacheContainer; + + public InfinispanProducer(InfinispanEndpoint endpoint, InfinispanConfiguration configuration) { + super(endpoint); + this.configuration = configuration; + } + + public void process(Exchange exchange) throws Exception { + new InfinispanOperation(getCache(exchange)).process(exchange); + } + + @Override + protected void doStart() throws Exception { + cacheContainer = configuration.getCacheContainer(); + if (cacheContainer == null) { + cacheContainer = new RemoteCacheManager(configuration.getHost()); + cacheContainer.start(); + isManagedCacheContainer = true; + } + super.doStart(); + } + + @Override + protected void doStop() throws Exception { + if (isManagedCacheContainer) { + cacheContainer.stop(); + } + super.doStop(); + } + + private BasicCache getCache(Exchange exchange) { + String cacheName = exchange.getIn().getHeader(InfinispanConstants.CACHE_NAME, String.class); + if (cacheName == null) { + cacheName = configuration.getCasheName(); + } + LOGGER.trace("Cache[{}]", cacheName); + return cacheName != null ? cacheContainer.getCache(cacheName) : cacheContainer.getCache(); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/6726d042/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanSyncEventListener.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanSyncEventListener.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanSyncEventListener.java new file mode 100644 index 0000000..987b7cb --- /dev/null +++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanSyncEventListener.java @@ -0,0 +1,45 @@ +package org.apache.camel.component.infinispan; + +import java.util.Set; + +import org.infinispan.notifications.Listener; +import org.infinispan.notifications.cachelistener.annotation.CacheEntryActivated; +import org.infinispan.notifications.cachelistener.annotation.CacheEntryCreated; +import org.infinispan.notifications.cachelistener.annotation.CacheEntryInvalidated; +import org.infinispan.notifications.cachelistener.annotation.CacheEntryLoaded; +import org.infinispan.notifications.cachelistener.annotation.CacheEntryModified; +import org.infinispan.notifications.cachelistener.annotation.CacheEntryPassivated; +import org.infinispan.notifications.cachelistener.annotation.CacheEntryRemoved; +import org.infinispan.notifications.cachelistener.annotation.CacheEntryVisited; +import org.infinispan.notifications.cachelistener.event.CacheEntryEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Listener(sync = true) +public class InfinispanSyncEventListener { + private final transient Logger LOGGER = LoggerFactory.getLogger(this.getClass()); + private final InfinispanConsumer infinispanConsumer; + private final Set<String> eventTypes; + + public InfinispanSyncEventListener(InfinispanConsumer infinispanConsumer, Set<String> eventTypes) { + this.infinispanConsumer = infinispanConsumer; + this.eventTypes = eventTypes; + } + + @CacheEntryActivated + @CacheEntryCreated + @CacheEntryInvalidated + @CacheEntryLoaded + @CacheEntryModified + @CacheEntryPassivated + @CacheEntryRemoved + @CacheEntryVisited + public void processEvent(CacheEntryEvent event) { + LOGGER.trace("Received CacheEntryEvent [{}]", event); + + if (eventTypes == null || eventTypes.isEmpty() || eventTypes.contains(event.getType().toString())) { + infinispanConsumer.processEvent(event.getType().toString(), event.isPre(), event.getCache().getName(), event.getKey()); + } + } +} + http://git-wip-us.apache.org/repos/asf/camel/blob/6726d042/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/processor/idempotent/InfinispanIdempotentRepository.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/processor/idempotent/InfinispanIdempotentRepository.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/processor/idempotent/InfinispanIdempotentRepository.java new file mode 100644 index 0000000..21a923a --- /dev/null +++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/processor/idempotent/InfinispanIdempotentRepository.java @@ -0,0 +1,92 @@ +package org.apache.camel.component.infinispan.processor.idempotent; + +import org.apache.camel.api.management.ManagedAttribute; +import org.apache.camel.api.management.ManagedOperation; +import org.apache.camel.api.management.ManagedResource; +import org.apache.camel.spi.IdempotentRepository; +import org.apache.camel.support.ServiceSupport; +import org.infinispan.commons.api.BasicCache; +import org.infinispan.commons.api.BasicCacheContainer; +import org.infinispan.manager.DefaultCacheManager; + +@ManagedResource(description = "Infinispan based message id repository") +public class InfinispanIdempotentRepository extends ServiceSupport implements IdempotentRepository<Object> { + private final String cacheName; + private final BasicCacheContainer cacheContainer; + private boolean isManagedCacheContainer; + + public InfinispanIdempotentRepository(BasicCacheContainer cacheContainer, String cacheName) { + this.cacheContainer = cacheContainer; + this.cacheName = cacheName; + } + + public InfinispanIdempotentRepository(String cacheName) { + cacheContainer = new DefaultCacheManager(); + this.cacheName = cacheName; + isManagedCacheContainer = true; + } + + public InfinispanIdempotentRepository() { + this(null); + } + + public static InfinispanIdempotentRepository infinispanIdempotentRepository( + BasicCacheContainer cacheContainer, String processorName) { + return new InfinispanIdempotentRepository(cacheContainer, processorName); + } + + public static InfinispanIdempotentRepository infinispanIdempotentRepository(String processorName) { + return new InfinispanIdempotentRepository(processorName); + } + + public static InfinispanIdempotentRepository infinispanIdempotentRepository() { + return new InfinispanIdempotentRepository(); + } + + @ManagedOperation(description = "Adds the key to the store") + public boolean add(Object key) { + Boolean put = getCache().put(key, true); + return put == null; + } + + @ManagedOperation(description = "Does the store contain the given key") + public boolean contains(Object key) { + return getCache().containsKey(key); + } + + @ManagedOperation(description = "Remove the key from the store") + public boolean remove(Object key) { + return getCache().remove(key) != null; + } + + @ManagedAttribute(description = "The processor name") + public String getCacheName() { + return cacheName; + } + + public boolean confirm(Object key) { + return true; + } + + protected void doStart() throws Exception { + // noop + } + + protected void doStop() throws Exception { + // noop + } + + protected void doShutdown() throws Exception { + super.doShutdown(); + if (isManagedCacheContainer) { + cacheContainer.stop(); + } + } + + private BasicCache<Object, Boolean> getCache() { + return cacheName != null + ? cacheContainer.<Object, Boolean>getCache(cacheName) + : cacheContainer.<Object, Boolean>getCache(); + } +} + http://git-wip-us.apache.org/repos/asf/camel/blob/6726d042/components/camel-infinispan/src/main/resources/META-INF/services/org/apache/camel/component/infinispan ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/main/resources/META-INF/services/org/apache/camel/component/infinispan b/components/camel-infinispan/src/main/resources/META-INF/services/org/apache/camel/component/infinispan new file mode 100644 index 0000000..7fe423f --- /dev/null +++ b/components/camel-infinispan/src/main/resources/META-INF/services/org/apache/camel/component/infinispan @@ -0,0 +1 @@ +class=org.apache.camel.component.infinispan.InfinispanComponent http://git-wip-us.apache.org/repos/asf/camel/blob/6726d042/components/camel-infinispan/src/main/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/main/resources/log4j.properties b/components/camel-infinispan/src/main/resources/log4j.properties new file mode 100644 index 0000000..4621723 --- /dev/null +++ b/components/camel-infinispan/src/main/resources/log4j.properties @@ -0,0 +1,16 @@ + +# +# The logging properties used +# +log4j.rootLogger=INFO, out + +# uncomment the following line to turn on Camel debugging +#log4j.logger.org.apache.camel=DEBUG + +# CONSOLE appender not used by default +log4j.appender.out=org.apache.log4j.ConsoleAppender +log4j.appender.out.layout=org.apache.log4j.PatternLayout +log4j.appender.out.layout.ConversionPattern=[%30.30t] %-30.30c{1} %-5p %m%n +#log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n + +log4j.throwableRenderer=org.apache.log4j.EnhancedThrowableRenderer \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/6726d042/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanComponentTest.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanComponentTest.java b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanComponentTest.java new file mode 100644 index 0000000..e1c94ac --- /dev/null +++ b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanComponentTest.java @@ -0,0 +1,48 @@ +package org.apache.camel.component.infinispan; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.junit.Test; + +import static org.hamcrest.core.Is.is; + +public class InfinispanComponentTest extends InfinispanTestSupport { + + @Test + public void consumerReceivedEntryCreatedEventNotifications() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMinimumMessageCount(2); + + currentCache().put(KEY_ONE, VALUE_ONE); + assertMockEndpointsSatisfied(); + } + + @Test + public void producerPublishesKeyAndValue() throws Exception { + template.send("direct:start", new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE); + exchange.getIn().setHeader(InfinispanConstants.VALUE, VALUE_ONE); + } + }); + + Object value = currentCache().get(KEY_ONE); + assertThat(value.toString(), is(VALUE_ONE)); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() { + from("infinispan://localhost?cacheContainer=#cacheContainer&eventTypes=CACHE_ENTRY_CREATED") + .to("mock:result"); + + from("direct:start") + .to("infinispan://localhost?cacheContainer=#cacheContainer"); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/6726d042/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanConsumerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanConsumerTest.java b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanConsumerTest.java new file mode 100644 index 0000000..55fb65d --- /dev/null +++ b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanConsumerTest.java @@ -0,0 +1,41 @@ +package org.apache.camel.component.infinispan; + +import org.apache.camel.EndpointInject; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.junit.Test; + +public class InfinispanConsumerTest extends InfinispanTestSupport { + + @EndpointInject(uri = "mock:result") + private MockEndpoint mockResult; + + @Test + public void consumerReceivedPreAndPostEntryCreatedEventNotifications() throws Exception { + mockResult.expectedMessageCount(2); + + mockResult.message(0).outHeader(InfinispanConstants.EVENT_TYPE).isEqualTo("CACHE_ENTRY_CREATED"); + mockResult.message(0).outHeader(InfinispanConstants.IS_PRE).isEqualTo(true); + mockResult.message(0).outHeader(InfinispanConstants.CACHE_NAME).isNotNull(); + mockResult.message(0).outHeader(InfinispanConstants.KEY).isEqualTo(KEY_ONE); + + mockResult.message(1).outHeader(InfinispanConstants.EVENT_TYPE).isEqualTo("CACHE_ENTRY_CREATED"); + mockResult.message(1).outHeader(InfinispanConstants.IS_PRE).isEqualTo(false); + mockResult.message(1).outHeader(InfinispanConstants.CACHE_NAME).isNotNull(); + mockResult.message(1).outHeader(InfinispanConstants.KEY).isEqualTo(KEY_ONE); + + currentCache().put(KEY_ONE, VALUE_ONE); + mockResult.assertIsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() { + from("infinispan://localhost?cacheContainer=#cacheContainer&sync=false&eventTypes=CACHE_ENTRY_CREATED") + .to("mock:result"); + } + }; + } +} + http://git-wip-us.apache.org/repos/asf/camel/blob/6726d042/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanProducerTest.java b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanProducerTest.java new file mode 100644 index 0000000..30d54a7 --- /dev/null +++ b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanProducerTest.java @@ -0,0 +1,115 @@ +package org.apache.camel.component.infinispan; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.junit.Test; + +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.core.Is.is; + +public class InfinispanProducerTest extends InfinispanTestSupport { + + @Test + public void keyAndValueArePublishedWithDefaultOperation() throws Exception { + template.send("direct:start", new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE); + exchange.getIn().setHeader(InfinispanConstants.VALUE, VALUE_ONE); + } + }); + + Object value = currentCache().get(KEY_ONE); + assertThat(value.toString(), is(VALUE_ONE)); + } + + @Test + public void publishKeyAndValueByExplicitlySpecifyingTheOperation() throws Exception { + template.send("direct:start", new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE); + exchange.getIn().setHeader(InfinispanConstants.VALUE, VALUE_ONE); + exchange.getIn().setHeader(InfinispanConstants.OPERATION, InfinispanConstants.PUT); + } + }); + + Object value = currentCache().get(KEY_ONE); + assertThat(value.toString(), is(VALUE_ONE)); + } + + @Test + public void putOperationReturnsThePreviousValue() throws Exception { + currentCache().put(KEY_ONE, "existing value"); + + Exchange exchange = template.request("direct:start", new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE); + exchange.getIn().setHeader(InfinispanConstants.VALUE, VALUE_ONE); + exchange.getIn().setHeader(InfinispanConstants.OPERATION, InfinispanConstants.PUT); + } + }); + + assertThat(exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class), is("existing value")); + } + + @Test + public void retrievesAValueByKey() throws Exception { + currentCache().put(KEY_ONE, VALUE_ONE); + + Exchange exchange = template.request("direct:start", new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE); + exchange.getIn().setHeader(InfinispanConstants.OPERATION, InfinispanConstants.GET); + } + }); + + assertThat(exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class), is(VALUE_ONE)); + } + + @Test + public void deletesExistingValueByKey() throws Exception { + currentCache().put(KEY_ONE, VALUE_ONE); + + Exchange exchange = template.request("direct:start", new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE); + exchange.getIn().setHeader(InfinispanConstants.OPERATION, InfinispanConstants.REMOVE); + } + }); + + assertThat(exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class), is(VALUE_ONE)); + + Object value = currentCache().get(KEY_ONE); + assertThat(value, is(nullValue())); + } + + @Test + public void clearsAllValues() throws Exception { + currentCache().put(KEY_ONE, VALUE_ONE); + assertThat(currentCache().isEmpty(), is(false)); + + template.send("direct:start", new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setHeader(InfinispanConstants.OPERATION, InfinispanConstants.CLEAR); + } + }); + + assertThat(currentCache().isEmpty(), is(true)); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() { + from("direct:start") + .to("infinispan://localhost?cacheContainer=#cacheContainer"); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/6726d042/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanRemoteProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanRemoteProducerTest.java b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanRemoteProducerTest.java new file mode 100644 index 0000000..fd6200c --- /dev/null +++ b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanRemoteProducerTest.java @@ -0,0 +1,35 @@ +package org.apache.camel.component.infinispan; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Ignore; +import org.junit.Test; + +@Ignore //start local server with: ./bin/startServer.sh -r hotrod +public class InfinispanRemoteProducerTest extends CamelTestSupport { + + @Test + public void producerPublishesKeyAndValue() throws Exception { + Exchange request = template.request("direct:start", new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setHeader(InfinispanConstants.KEY, "keyOne"); + exchange.getIn().setHeader(InfinispanConstants.VALUE, "valueOne"); + } + }); + + assertNull(request.getException()); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() { + from("direct:start") + .to("infinispan://localhost"); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/6726d042/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanSyncConsumerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanSyncConsumerTest.java b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanSyncConsumerTest.java new file mode 100644 index 0000000..690663d --- /dev/null +++ b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanSyncConsumerTest.java @@ -0,0 +1,38 @@ +package org.apache.camel.component.infinispan; + +import org.apache.camel.EndpointInject; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.impl.converter.AsyncProcessorTypeConverter; +import org.apache.camel.util.AsyncProcessorConverterHelper; +import org.junit.Test; + +public class InfinispanSyncConsumerTest extends InfinispanTestSupport { + + @EndpointInject(uri = "mock:result") + private MockEndpoint mockResult; + + @Test + public void consumerReceivedPreAndPostEntryCreatedEventNotifications() throws Exception { + mockResult.expectedMessageCount(2); + mockResult.setMinimumResultWaitTime(900); + + currentCache().put(KEY_ONE, VALUE_ONE); + mockResult.assertIsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() { + from("infinispan://localhost?cacheContainer=#cacheContainer&sync=false&eventTypes=CACHE_ENTRY_CREATED") + + .delayer(500) + .to("mock:result"); + } + }; + } +} + http://git-wip-us.apache.org/repos/asf/camel/blob/6726d042/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanTestSupport.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanTestSupport.java b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanTestSupport.java new file mode 100644 index 0000000..a628fa5 --- /dev/null +++ b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanTestSupport.java @@ -0,0 +1,40 @@ +package org.apache.camel.component.infinispan; + +import org.apache.camel.impl.JndiRegistry; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.infinispan.commons.api.BasicCache; +import org.infinispan.commons.api.BasicCacheContainer; +import org.infinispan.manager.DefaultCacheManager; +import org.junit.Before; + +public class InfinispanTestSupport extends CamelTestSupport { + protected static final String KEY_ONE = "keyOne"; + protected static final String VALUE_ONE = "valueOne"; + + protected BasicCacheContainer basicCacheContainer; + + @Override + @Before + public void setUp() throws Exception { + basicCacheContainer = new DefaultCacheManager(); + basicCacheContainer.start(); + super.setUp(); + } + + @Override + public void tearDown() throws Exception { + basicCacheContainer.stop(); + super.tearDown(); + } + + @Override + protected JndiRegistry createRegistry() throws Exception { + JndiRegistry registry = super.createRegistry(); + registry.bind("cacheContainer", basicCacheContainer); + return registry; + } + + protected BasicCache<Object, Object> currentCache() { + return basicCacheContainer.getCache(); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/6726d042/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/processor/idempotent/InfinispanDefaultIdempotentRepositoryTest.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/processor/idempotent/InfinispanDefaultIdempotentRepositoryTest.java b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/processor/idempotent/InfinispanDefaultIdempotentRepositoryTest.java new file mode 100644 index 0000000..b10449e --- /dev/null +++ b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/processor/idempotent/InfinispanDefaultIdempotentRepositoryTest.java @@ -0,0 +1,25 @@ +package org.apache.camel.component.infinispan.processor.idempotent; + +import org.junit.Test; + +import static org.jgroups.util.Util.assertFalse; +import static org.jgroups.util.Util.assertTrue; + +public class InfinispanDefaultIdempotentRepositoryTest { + + @Test + public void createsRepositoryUsingInternalCache() throws Exception { + InfinispanIdempotentRepository repository = InfinispanIdempotentRepository.infinispanIdempotentRepository(); + + assertFalse(repository.contains("One")); + assertFalse(repository.remove("One")); + + assertTrue(repository.add("One")); + + assertTrue(repository.contains("One")); + assertTrue(repository.remove("One")); + + assertFalse(repository.contains("One")); + assertFalse(repository.remove("One")); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/6726d042/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/processor/idempotent/InfinispanIdempotentRepositoryTest.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/processor/idempotent/InfinispanIdempotentRepositoryTest.java b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/processor/idempotent/InfinispanIdempotentRepositoryTest.java new file mode 100644 index 0000000..24357e9 --- /dev/null +++ b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/processor/idempotent/InfinispanIdempotentRepositoryTest.java @@ -0,0 +1,74 @@ +package org.apache.camel.component.infinispan.processor.idempotent; + +import org.infinispan.commons.api.BasicCache; +import org.infinispan.commons.api.BasicCacheContainer; +import org.infinispan.configuration.global.GlobalConfiguration; +import org.infinispan.configuration.global.GlobalConfigurationBuilder; +import org.infinispan.manager.DefaultCacheManager; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.jgroups.util.Util.assertFalse; +import static org.jgroups.util.Util.assertTrue; + +public class InfinispanIdempotentRepositoryTest { + protected BasicCacheContainer basicCacheContainer; + protected InfinispanIdempotentRepository idempotentRepository; + protected String cacheName = "test"; + public static final GlobalConfiguration GLOBAL_CONFIGURATION = new GlobalConfigurationBuilder().globalJmxStatistics().allowDuplicateDomains(true).build(); + + @Before + public void setUp() throws Exception { + basicCacheContainer = new DefaultCacheManager(GLOBAL_CONFIGURATION); + basicCacheContainer.start(); + idempotentRepository = InfinispanIdempotentRepository.infinispanIdempotentRepository(basicCacheContainer, cacheName); + } + + @After + public void tearDown() throws Exception { + basicCacheContainer.stop(); + } + + @Test + public void addsNewKeysToCache() throws Exception { + assertTrue(idempotentRepository.add("One")); + assertTrue(idempotentRepository.add("Two")); + + assertTrue(getCache().containsKey("One")); + assertTrue(getCache().containsKey("Two")); + } + + @Test + public void skipsAddingSecondTimeTheSameKey() throws Exception { + assertTrue(idempotentRepository.add("One")); + assertFalse(idempotentRepository.add("One")); + } + + @Test + public void containsPreviouslyAddedKey() throws Exception { + assertFalse(idempotentRepository.contains("One")); + + idempotentRepository.add("One"); + + assertTrue(idempotentRepository.contains("One")); + } + + @Test + public void removesAnExistingKey() throws Exception { + idempotentRepository.add("One"); + + assertTrue(idempotentRepository.remove("One")); + + assertFalse(idempotentRepository.contains("One")); + } + + @Test + public void doesntRemoveMissingKey() throws Exception { + assertFalse(idempotentRepository.remove("One")); + } + + private BasicCache<Object, Object> getCache() { + return basicCacheContainer.getCache(cacheName); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/6726d042/components/pom.xml ---------------------------------------------------------------------- diff --git a/components/pom.xml b/components/pom.xml index 811c702..23dd79e 100644 --- a/components/pom.xml +++ b/components/pom.xml @@ -98,6 +98,7 @@ <module>camel-hl7</module> <module>camel-ibatis</module> <module>camel-ical</module> + <module>camel-infinispan</module> <module>camel-irc</module> <module>camel-jackson</module> <module>camel-javaspace</module> http://git-wip-us.apache.org/repos/asf/camel/blob/6726d042/parent/pom.xml ---------------------------------------------------------------------- diff --git a/parent/pom.xml b/parent/pom.xml index f40a55d..1ba1841 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -181,6 +181,7 @@ <httpclient-version>3.1</httpclient-version> <ibatis-bundle-version>2.3.4.726_4</ibatis-bundle-version> <ibatis-version>2.3.4.726</ibatis-version> + <infinispan-version>6.0.0.Alpha1</infinispan-version> <irclib-bundle-version>1.10_5</irclib-bundle-version> <irclib-version>1.10</irclib-version> <isorelax-bundle-version>20050913_4</isorelax-bundle-version>
