Author: hekonsek
Date: Sun Mar 24 21:05:44 2013
New Revision: 1460463
URL: http://svn.apache.org/r1460463
Log:
Added support for custom listener interfaces.
Added:
camel/trunk/components/camel-guava-eventbus/src/main/java/org/apache/camel/component/guava/eventbus/CamelEventHandler.java
- copied, changed from r1459090,
camel/trunk/components/camel-guava-eventbus/src/main/java/org/apache/camel/component/guava/eventbus/CamelEventHandler.java
camel/trunk/components/camel-guava-eventbus/src/main/java/org/apache/camel/component/guava/eventbus/FilteringCamelEventHandler.java
- copied, changed from r1459090,
camel/trunk/components/camel-guava-eventbus/src/main/java/org/apache/camel/component/guava/eventbus/CamelEventHandler.java
camel/trunk/components/camel-guava-eventbus/src/test/java/org/apache/camel/component/guava/eventbus/CustomListener.java
camel/trunk/components/camel-guava-eventbus/src/test/java/org/apache/camel/component/guava/eventbus/DeadEventListener.java
camel/trunk/components/camel-guava-eventbus/src/test/java/org/apache/camel/component/guava/eventbus/GuavaEventBusConsumerConfigurationTest.java
- copied, changed from r1459090,
camel/trunk/components/camel-guava-eventbus/src/test/java/org/apache/camel/component/guava/eventbus/GuavaEventBusConsumerTest.java
camel/trunk/components/camel-guava-eventbus/src/test/java/org/apache/camel/component/guava/eventbus/GuavaEventBusConsumingDeadEventsTest.java
- copied, changed from r1459090,
camel/trunk/components/camel-guava-eventbus/src/test/java/org/apache/camel/component/guava/eventbus/GuavaEventBusConsumerTest.java
Modified:
camel/trunk/components/camel-guava-eventbus/src/main/java/org/apache/camel/component/guava/eventbus/GuavaEventBusComponent.java
camel/trunk/components/camel-guava-eventbus/src/main/java/org/apache/camel/component/guava/eventbus/GuavaEventBusConsumer.java
camel/trunk/components/camel-guava-eventbus/src/main/java/org/apache/camel/component/guava/eventbus/GuavaEventBusEndpoint.java
camel/trunk/components/camel-guava-eventbus/src/test/java/org/apache/camel/component/guava/eventbus/GuavaEventBusConsumerTest.java
Copied:
camel/trunk/components/camel-guava-eventbus/src/main/java/org/apache/camel/component/guava/eventbus/CamelEventHandler.java
(from r1459090,
camel/trunk/components/camel-guava-eventbus/src/main/java/org/apache/camel/component/guava/eventbus/CamelEventHandler.java)
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-guava-eventbus/src/main/java/org/apache/camel/component/guava/eventbus/CamelEventHandler.java?p2=camel/trunk/components/camel-guava-eventbus/src/main/java/org/apache/camel/component/guava/eventbus/CamelEventHandler.java&p1=camel/trunk/components/camel-guava-eventbus/src/main/java/org/apache/camel/component/guava/eventbus/CamelEventHandler.java&r1=1459090&r2=1460463&rev=1460463&view=diff
==============================================================================
---
camel/trunk/components/camel-guava-eventbus/src/main/java/org/apache/camel/component/guava/eventbus/CamelEventHandler.java
(original)
+++
camel/trunk/components/camel-guava-eventbus/src/main/java/org/apache/camel/component/guava/eventbus/CamelEventHandler.java
Sun Mar 24 21:05:44 2013
@@ -16,7 +16,6 @@
*/
package org.apache.camel.component.guava.eventbus;
-import com.google.common.eventbus.Subscribe;
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
@@ -26,50 +25,40 @@ import org.apache.camel.util.ObjectHelpe
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/**
- * Class with public method marked with Guava @Subscribe annotation.
Responsible for receiving events from the bus and
- * sending them to the Camel infrastructure.
+/*
+ * Handler responsible for receiving events from the Guava event bus and
sending them to the Camel infrastructure.
*/
public class CamelEventHandler {
- private static final transient Logger LOG =
LoggerFactory.getLogger(CamelEventHandler.class);
- private final GuavaEventBusEndpoint eventBusEndpoint;
- private final AsyncProcessor processor;
- private final Class<?> eventClass;
+ protected final transient Logger log =
LoggerFactory.getLogger(CamelEventHandler.class);
+ protected final GuavaEventBusEndpoint eventBusEndpoint;
+ protected final AsyncProcessor processor;
- public CamelEventHandler(GuavaEventBusEndpoint eventBusEndpoint, Processor
processor, Class<?> eventClass) {
+ public CamelEventHandler(GuavaEventBusEndpoint eventBusEndpoint, Processor
processor) {
ObjectHelper.notNull(eventBusEndpoint, "eventBusEndpoint");
ObjectHelper.notNull(processor, "processor");
this.eventBusEndpoint = eventBusEndpoint;
this.processor = AsyncProcessorConverterHelper.convert(processor);
- this.eventClass = eventClass;
}
/**
- * Guava callback when an event was received
- * @param event the event
- * @throws Exception is thrown if error processing the even
+ * Callback executed to propagate event from Guava listener to Camel route.
+ *
+ * @param event the event received by Guava
+ * @throws Exception is thrown if error processing the event
*/
- @Subscribe
- public void eventReceived(Object event) throws Exception {
- LOG.trace("Received event: {}");
- if (eventClass == null ||
eventClass.isAssignableFrom(event.getClass())) {
- final Exchange exchange = eventBusEndpoint.createExchange(event);
- LOG.debug("Processing event: {}", event);
- // use async processor to support async routing engine
- processor.process(exchange, new AsyncCallback() {
- @Override
- public void done(boolean doneSync) {
- // noop
- }
- });
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Cannot process event: {} as its class type: {} is
not assignable with: {}",
- new Object[]{event, event.getClass().getName(),
eventClass.getName()});
+ public void doEventReceived(Object event) throws Exception {
+ log.trace("Received event: {}");
+ final Exchange exchange = eventBusEndpoint.createExchange(event);
+ log.debug("Processing event: {}", event);
+ // use async processor to support async routing engine
+ processor.process(exchange, new AsyncCallback() {
+ @Override
+ public void done(boolean doneSync) {
+ // noop
}
- }
+ });
}
-}
+}
\ No newline at end of file
Copied:
camel/trunk/components/camel-guava-eventbus/src/main/java/org/apache/camel/component/guava/eventbus/FilteringCamelEventHandler.java
(from r1459090,
camel/trunk/components/camel-guava-eventbus/src/main/java/org/apache/camel/component/guava/eventbus/CamelEventHandler.java)
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-guava-eventbus/src/main/java/org/apache/camel/component/guava/eventbus/FilteringCamelEventHandler.java?p2=camel/trunk/components/camel-guava-eventbus/src/main/java/org/apache/camel/component/guava/eventbus/FilteringCamelEventHandler.java&p1=camel/trunk/components/camel-guava-eventbus/src/main/java/org/apache/camel/component/guava/eventbus/CamelEventHandler.java&r1=1459090&r2=1460463&rev=1460463&view=diff
==============================================================================
---
camel/trunk/components/camel-guava-eventbus/src/main/java/org/apache/camel/component/guava/eventbus/CamelEventHandler.java
(original)
+++
camel/trunk/components/camel-guava-eventbus/src/main/java/org/apache/camel/component/guava/eventbus/FilteringCamelEventHandler.java
Sun Mar 24 21:05:44 2013
@@ -17,56 +17,33 @@
package org.apache.camel.component.guava.eventbus;
import com.google.common.eventbus.Subscribe;
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
-import org.apache.camel.Exchange;
import org.apache.camel.Processor;
-import org.apache.camel.util.AsyncProcessorConverterHelper;
-import org.apache.camel.util.ObjectHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
- * Class with public method marked with Guava @Subscribe annotation.
Responsible for receiving events from the bus and
- * sending them to the Camel infrastructure.
+ * Subtype of CamelEventHandler with public method marked with Guava
@Subscribe annotation.
*/
-public class CamelEventHandler {
+public class FilteringCamelEventHandler extends CamelEventHandler {
- private static final transient Logger LOG =
LoggerFactory.getLogger(CamelEventHandler.class);
- private final GuavaEventBusEndpoint eventBusEndpoint;
- private final AsyncProcessor processor;
private final Class<?> eventClass;
- public CamelEventHandler(GuavaEventBusEndpoint eventBusEndpoint, Processor
processor, Class<?> eventClass) {
- ObjectHelper.notNull(eventBusEndpoint, "eventBusEndpoint");
- ObjectHelper.notNull(processor, "processor");
-
- this.eventBusEndpoint = eventBusEndpoint;
- this.processor = AsyncProcessorConverterHelper.convert(processor);
+ public FilteringCamelEventHandler(GuavaEventBusEndpoint eventBusEndpoint,
Processor processor, Class<?> eventClass) {
+ super(eventBusEndpoint, processor);
this.eventClass = eventClass;
}
/**
- * Guava callback when an event was received
+ * Guava callback executed when an event was received.
+ *
* @param event the event
- * @throws Exception is thrown if error processing the even
+ * @throws Exception is thrown if error processing the event
*/
@Subscribe
public void eventReceived(Object event) throws Exception {
- LOG.trace("Received event: {}");
if (eventClass == null ||
eventClass.isAssignableFrom(event.getClass())) {
- final Exchange exchange = eventBusEndpoint.createExchange(event);
- LOG.debug("Processing event: {}", event);
- // use async processor to support async routing engine
- processor.process(exchange, new AsyncCallback() {
- @Override
- public void done(boolean doneSync) {
- // noop
- }
- });
+ doEventReceived(event);
} else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Cannot process event: {} as its class type: {} is
not assignable with: {}",
+ if (log.isDebugEnabled()) {
+ log.debug("Cannot process event: {} as its class type: {} is
not assignable with: {}",
new Object[]{event, event.getClass().getName(),
eventClass.getName()});
}
}
Modified:
camel/trunk/components/camel-guava-eventbus/src/main/java/org/apache/camel/component/guava/eventbus/GuavaEventBusComponent.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-guava-eventbus/src/main/java/org/apache/camel/component/guava/eventbus/GuavaEventBusComponent.java?rev=1460463&r1=1460462&r2=1460463&view=diff
==============================================================================
---
camel/trunk/components/camel-guava-eventbus/src/main/java/org/apache/camel/component/guava/eventbus/GuavaEventBusComponent.java
(original)
+++
camel/trunk/components/camel-guava-eventbus/src/main/java/org/apache/camel/component/guava/eventbus/GuavaEventBusComponent.java
Sun Mar 24 21:05:44 2013
@@ -26,6 +26,7 @@ import org.apache.camel.util.CamelContex
public class GuavaEventBusComponent extends DefaultComponent {
private EventBus eventBus;
+ private Class<?> listenerInterface;
@Override
protected Endpoint createEndpoint(String uri, String remaining,
Map<String, Object> parameters) throws Exception {
@@ -33,7 +34,8 @@ public class GuavaEventBusComponent exte
if (resolvedEventBus == null) {
resolvedEventBus =
CamelContextHelper.mandatoryLookup(getCamelContext(), remaining,
EventBus.class);
}
- return new GuavaEventBusEndpoint(uri, this, resolvedEventBus);
+
+ return new GuavaEventBusEndpoint(uri, this, resolvedEventBus,
listenerInterface);
}
public EventBus getEventBus() {
@@ -44,4 +46,12 @@ public class GuavaEventBusComponent exte
this.eventBus = eventBus;
}
+ public Class<?> getListenerInterface() {
+ return listenerInterface;
+ }
+
+ public void setListenerInterface(Class<?> listenerInterface) {
+ this.listenerInterface = listenerInterface;
+ }
+
}
\ No newline at end of file
Modified:
camel/trunk/components/camel-guava-eventbus/src/main/java/org/apache/camel/component/guava/eventbus/GuavaEventBusConsumer.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-guava-eventbus/src/main/java/org/apache/camel/component/guava/eventbus/GuavaEventBusConsumer.java?rev=1460463&r1=1460462&r2=1460463&view=diff
==============================================================================
---
camel/trunk/components/camel-guava-eventbus/src/main/java/org/apache/camel/component/guava/eventbus/GuavaEventBusConsumer.java
(original)
+++
camel/trunk/components/camel-guava-eventbus/src/main/java/org/apache/camel/component/guava/eventbus/GuavaEventBusConsumer.java
Sun Mar 24 21:05:44 2013
@@ -16,19 +16,35 @@
*/
package org.apache.camel.component.guava.eventbus;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+
import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class GuavaEventBusConsumer extends DefaultConsumer {
private final EventBus eventBus;
- private final CamelEventHandler eventHandler;
+ private final Object eventHandler;
- public GuavaEventBusConsumer(GuavaEventBusEndpoint endpoint, Processor
processor, EventBus eventBus, Class<?> eventClass) {
+ public GuavaEventBusConsumer(GuavaEventBusEndpoint endpoint, Processor
processor, EventBus eventBus, Class<?> eventClass, Class<?> listenerInterface) {
super(endpoint, processor);
+
+ if (eventClass != null && listenerInterface != null) {
+ throw new IllegalStateException("You cannot set both 'eventClass'
and 'listenerInterface' parameters.");
+ }
+
this.eventBus = eventBus;
- this.eventHandler = new CamelEventHandler(endpoint, processor,
eventClass);
+ if (listenerInterface != null) {
+ this.eventHandler = createListenerInterfaceProxy(endpoint,
processor, listenerInterface);
+ } else {
+ this.eventHandler = new FilteringCamelEventHandler(endpoint,
processor, eventClass);
+ }
}
@Override
@@ -44,5 +60,33 @@ public class GuavaEventBusConsumer exten
eventBus.unregister(eventHandler);
super.doStop();
}
+
+ private Object createListenerInterfaceProxy(GuavaEventBusEndpoint
endpoint, Processor processor, Class<?> listenerInterface) {
+ ClassLoader classLoader =
Thread.currentThread().getContextClassLoader();
+ return Proxy.newProxyInstance(classLoader, new
Class[]{listenerInterface}, new ListenerInterfaceHandler(endpoint, processor));
+ }
+
+ private static final class ListenerInterfaceHandler implements
InvocationHandler {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ListenerInterfaceHandler.class);
+
+ private final CamelEventHandler delegateHandler;
+
+ private ListenerInterfaceHandler(GuavaEventBusEndpoint endpoint,
Processor processor) {
+ this.delegateHandler = new CamelEventHandler(endpoint, processor);
+ }
+
+ @Override
+ public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
+ if (method.getAnnotation(Subscribe.class) != null) {
+ delegateHandler.doEventReceived(args[0]);
+ } else {
+ LOG.warn("Non @Subscribe method {} called on ListenerInterface
proxy.", method);
+ }
+ return null;
+ }
+
+ }
+
}
Modified:
camel/trunk/components/camel-guava-eventbus/src/main/java/org/apache/camel/component/guava/eventbus/GuavaEventBusEndpoint.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-guava-eventbus/src/main/java/org/apache/camel/component/guava/eventbus/GuavaEventBusEndpoint.java?rev=1460463&r1=1460462&r2=1460463&view=diff
==============================================================================
---
camel/trunk/components/camel-guava-eventbus/src/main/java/org/apache/camel/component/guava/eventbus/GuavaEventBusEndpoint.java
(original)
+++
camel/trunk/components/camel-guava-eventbus/src/main/java/org/apache/camel/component/guava/eventbus/GuavaEventBusEndpoint.java
Sun Mar 24 21:05:44 2013
@@ -29,10 +29,12 @@ public class GuavaEventBusEndpoint exten
private EventBus eventBus;
private Class<?> eventClass;
+ private Class<?> listenerInterface;
- public GuavaEventBusEndpoint(String endpointUri, Component component,
EventBus eventBus) {
+ public GuavaEventBusEndpoint(String endpointUri, Component component,
EventBus eventBus, Class<?> listenerInterface) {
super(endpointUri, component);
this.eventBus = eventBus;
+ this.listenerInterface = listenerInterface;
}
@Override
@@ -42,7 +44,7 @@ public class GuavaEventBusEndpoint exten
@Override
public Consumer createConsumer(Processor processor) throws Exception {
- return new GuavaEventBusConsumer(this, processor, eventBus,
eventClass);
+ return new GuavaEventBusConsumer(this, processor, eventBus,
eventClass, listenerInterface);
}
@Override
@@ -50,6 +52,11 @@ public class GuavaEventBusEndpoint exten
return true;
}
+ @Override
+ public boolean isMultipleConsumersSupported() {
+ return true;
+ }
+
public Exchange createExchange(Object event) {
Exchange exchange = createExchange();
exchange.getIn().setBody(event);
@@ -72,9 +79,12 @@ public class GuavaEventBusEndpoint exten
this.eventClass = eventClass;
}
- @Override
- public boolean isMultipleConsumersSupported() {
- return true;
+ public Class<?> getListenerInterface() {
+ return listenerInterface;
+ }
+
+ public void setListenerInterface(Class<?> listenerInterface) {
+ this.listenerInterface = listenerInterface;
}
}
Added:
camel/trunk/components/camel-guava-eventbus/src/test/java/org/apache/camel/component/guava/eventbus/CustomListener.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-guava-eventbus/src/test/java/org/apache/camel/component/guava/eventbus/CustomListener.java?rev=1460463&view=auto
==============================================================================
---
camel/trunk/components/camel-guava-eventbus/src/test/java/org/apache/camel/component/guava/eventbus/CustomListener.java
(added)
+++
camel/trunk/components/camel-guava-eventbus/src/test/java/org/apache/camel/component/guava/eventbus/CustomListener.java
Sun Mar 24 21:05:44 2013
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.guava.eventbus;
+
+import com.google.common.eventbus.Subscribe;
+
+public interface CustomListener {
+
+ @Subscribe
+ void eventReceived(MessageWrapper messageWrapper);
+
+}
Added:
camel/trunk/components/camel-guava-eventbus/src/test/java/org/apache/camel/component/guava/eventbus/DeadEventListener.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-guava-eventbus/src/test/java/org/apache/camel/component/guava/eventbus/DeadEventListener.java?rev=1460463&view=auto
==============================================================================
---
camel/trunk/components/camel-guava-eventbus/src/test/java/org/apache/camel/component/guava/eventbus/DeadEventListener.java
(added)
+++
camel/trunk/components/camel-guava-eventbus/src/test/java/org/apache/camel/component/guava/eventbus/DeadEventListener.java
Sun Mar 24 21:05:44 2013
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.guava.eventbus;
+
+import com.google.common.eventbus.DeadEvent;
+import com.google.common.eventbus.Subscribe;
+
+public interface DeadEventListener {
+
+ @Subscribe
+ void deadEventReceived(DeadEvent deadEvent);
+
+}
Copied:
camel/trunk/components/camel-guava-eventbus/src/test/java/org/apache/camel/component/guava/eventbus/GuavaEventBusConsumerConfigurationTest.java
(from r1459090,
camel/trunk/components/camel-guava-eventbus/src/test/java/org/apache/camel/component/guava/eventbus/GuavaEventBusConsumerTest.java)
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-guava-eventbus/src/test/java/org/apache/camel/component/guava/eventbus/GuavaEventBusConsumerConfigurationTest.java?p2=camel/trunk/components/camel-guava-eventbus/src/test/java/org/apache/camel/component/guava/eventbus/GuavaEventBusConsumerConfigurationTest.java&p1=camel/trunk/components/camel-guava-eventbus/src/test/java/org/apache/camel/component/guava/eventbus/GuavaEventBusConsumerTest.java&r1=1459090&r2=1460463&rev=1460463&view=diff
==============================================================================
---
camel/trunk/components/camel-guava-eventbus/src/test/java/org/apache/camel/component/guava/eventbus/GuavaEventBusConsumerTest.java
(original)
+++
camel/trunk/components/camel-guava-eventbus/src/test/java/org/apache/camel/component/guava/eventbus/GuavaEventBusConsumerConfigurationTest.java
Sun Mar 24 21:05:44 2013
@@ -17,79 +17,30 @@
package org.apache.camel.component.guava.eventbus;
import com.google.common.eventbus.EventBus;
+import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.impl.JndiRegistry;
-import org.apache.camel.test.junit4.CamelTestSupport;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.SimpleRegistry;
import org.junit.Test;
-public class GuavaEventBusConsumerTest extends CamelTestSupport {
+public class GuavaEventBusConsumerConfigurationTest {
- EventBus eventBus = new EventBus();
-
- @Override
- protected RouteBuilder createRouteBuilder() throws Exception {
- return new RouteBuilder() {
+ @Test(expected = IllegalStateException.class)
+ public void shouldForwardMessageToCamel() throws Exception {
+ // Given
+ SimpleRegistry registry = new SimpleRegistry();
+ registry.put("eventBus", new EventBus());
+ CamelContext context = new DefaultCamelContext(registry);
+ context.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
- from("guava-eventbus:eventBus").to("mock:allEvents");
- from("guava-eventbus:eventBus").to("mock:multipliedConsumer");
-
-
from("guava-eventbus:eventBus?eventClass=org.apache.camel.component.guava.eventbus.MessageWrapper").
- to("mock:wrapperEvents");
+
from("guava-eventbus:eventBus?listenerInterface=org.apache.camel.component.guava.eventbus.CustomListener&eventClass=org.apache.camel.component.guava.eventbus.MessageWrapper").
+ to("mock:customListenerEvents");
}
- };
- }
-
- @Override
- protected JndiRegistry createRegistry() throws Exception {
- JndiRegistry registry = super.createRegistry();
- registry.bind("eventBus", eventBus);
- return registry;
- }
-
- @Test
- public void shouldForwardMessageToCamel() throws InterruptedException {
- // Given
- String message = "message";
+ });
// When
- eventBus.post(message);
-
- // Then
- getMockEndpoint("mock:allEvents").setExpectedMessageCount(1);
- assertMockEndpointsSatisfied();
- assertEquals(message,
getMockEndpoint("mock:allEvents").getExchanges().get(0).getIn().getBody());
- }
-
- @Test
- public void shouldForwardMessageToMultipleConsumers() throws
InterruptedException {
- // Given
- String message = "message";
-
- // When
- eventBus.post(message);
-
- // Then
- getMockEndpoint("mock:allEvents").setExpectedMessageCount(1);
- getMockEndpoint("mock:multipliedConsumer").setExpectedMessageCount(1);
- assertMockEndpointsSatisfied();
- assertEquals(message,
getMockEndpoint("mock:allEvents").getExchanges().get(0).getIn().getBody());
- assertEquals(message,
getMockEndpoint("mock:multipliedConsumer").getExchanges().get(0).getIn().getBody());
- }
-
- @Test
- public void shouldFilterForwardedMessages() throws InterruptedException {
- // Given
- MessageWrapper wrappedMessage = new MessageWrapper("message");
-
- // When
- eventBus.post(wrappedMessage);
- eventBus.post("String message.");
-
- // Then
- getMockEndpoint("mock:wrapperEvents").setExpectedMessageCount(1);
- assertMockEndpointsSatisfied();
- assertEquals(wrappedMessage,
getMockEndpoint("mock:wrapperEvents").getExchanges().get(0).getIn().getBody());
+ context.start();
}
}
Modified:
camel/trunk/components/camel-guava-eventbus/src/test/java/org/apache/camel/component/guava/eventbus/GuavaEventBusConsumerTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-guava-eventbus/src/test/java/org/apache/camel/component/guava/eventbus/GuavaEventBusConsumerTest.java?rev=1460463&r1=1460462&r2=1460463&view=diff
==============================================================================
---
camel/trunk/components/camel-guava-eventbus/src/test/java/org/apache/camel/component/guava/eventbus/GuavaEventBusConsumerTest.java
(original)
+++
camel/trunk/components/camel-guava-eventbus/src/test/java/org/apache/camel/component/guava/eventbus/GuavaEventBusConsumerTest.java
Sun Mar 24 21:05:44 2013
@@ -36,6 +36,9 @@ public class GuavaEventBusConsumerTest e
from("guava-eventbus:eventBus?eventClass=org.apache.camel.component.guava.eventbus.MessageWrapper").
to("mock:wrapperEvents");
+
+
from("guava-eventbus:eventBus?listenerInterface=org.apache.camel.component.guava.eventbus.CustomListener").
+ to("mock:customListenerEvents");
}
};
}
@@ -92,4 +95,19 @@ public class GuavaEventBusConsumerTest e
assertEquals(wrappedMessage,
getMockEndpoint("mock:wrapperEvents").getExchanges().get(0).getIn().getBody());
}
+ @Test
+ public void shouldUseCustomListener() throws InterruptedException {
+ // Given
+ MessageWrapper wrappedMessage = new MessageWrapper("message");
+
+ // When
+ eventBus.post(wrappedMessage);
+ eventBus.post("String message.");
+
+ // Then
+
getMockEndpoint("mock:customListenerEvents").setExpectedMessageCount(1);
+ assertMockEndpointsSatisfied();
+ assertEquals(wrappedMessage,
getMockEndpoint("mock:customListenerEvents").getExchanges().get(0).getIn().getBody());
+ }
+
}
Copied:
camel/trunk/components/camel-guava-eventbus/src/test/java/org/apache/camel/component/guava/eventbus/GuavaEventBusConsumingDeadEventsTest.java
(from r1459090,
camel/trunk/components/camel-guava-eventbus/src/test/java/org/apache/camel/component/guava/eventbus/GuavaEventBusConsumerTest.java)
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-guava-eventbus/src/test/java/org/apache/camel/component/guava/eventbus/GuavaEventBusConsumingDeadEventsTest.java?p2=camel/trunk/components/camel-guava-eventbus/src/test/java/org/apache/camel/component/guava/eventbus/GuavaEventBusConsumingDeadEventsTest.java&p1=camel/trunk/components/camel-guava-eventbus/src/test/java/org/apache/camel/component/guava/eventbus/GuavaEventBusConsumerTest.java&r1=1459090&r2=1460463&rev=1460463&view=diff
==============================================================================
---
camel/trunk/components/camel-guava-eventbus/src/test/java/org/apache/camel/component/guava/eventbus/GuavaEventBusConsumerTest.java
(original)
+++
camel/trunk/components/camel-guava-eventbus/src/test/java/org/apache/camel/component/guava/eventbus/GuavaEventBusConsumingDeadEventsTest.java
Sun Mar 24 21:05:44 2013
@@ -16,13 +16,16 @@
*/
package org.apache.camel.component.guava.eventbus;
+import java.util.Date;
+
+import com.google.common.eventbus.DeadEvent;
import com.google.common.eventbus.EventBus;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.JndiRegistry;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Test;
-public class GuavaEventBusConsumerTest extends CamelTestSupport {
+public class GuavaEventBusConsumingDeadEventsTest extends CamelTestSupport {
EventBus eventBus = new EventBus();
@@ -31,11 +34,11 @@ public class GuavaEventBusConsumerTest e
return new RouteBuilder() {
@Override
public void configure() throws Exception {
- from("guava-eventbus:eventBus").to("mock:allEvents");
- from("guava-eventbus:eventBus").to("mock:multipliedConsumer");
+
from("guava-eventbus:eventBus?listenerInterface=org.apache.camel.component.guava.eventbus.CustomListener").
+ to("mock:customListenerEvents");
-
from("guava-eventbus:eventBus?eventClass=org.apache.camel.component.guava.eventbus.MessageWrapper").
- to("mock:wrapperEvents");
+
from("guava-eventbus:eventBus?listenerInterface=org.apache.camel.component.guava.eventbus.DeadEventListener").
+ to("mock:deadEvents");
}
};
}
@@ -50,46 +53,17 @@ public class GuavaEventBusConsumerTest e
@Test
public void shouldForwardMessageToCamel() throws InterruptedException {
// Given
- String message = "message";
+ Date message = new Date();
// When
eventBus.post(message);
// Then
- getMockEndpoint("mock:allEvents").setExpectedMessageCount(1);
+
getMockEndpoint("mock:customListenerEvents").setExpectedMessageCount(0);
assertMockEndpointsSatisfied();
- assertEquals(message,
getMockEndpoint("mock:allEvents").getExchanges().get(0).getIn().getBody());
- }
-
- @Test
- public void shouldForwardMessageToMultipleConsumers() throws
InterruptedException {
- // Given
- String message = "message";
-
- // When
- eventBus.post(message);
-
- // Then
- getMockEndpoint("mock:allEvents").setExpectedMessageCount(1);
- getMockEndpoint("mock:multipliedConsumer").setExpectedMessageCount(1);
- assertMockEndpointsSatisfied();
- assertEquals(message,
getMockEndpoint("mock:allEvents").getExchanges().get(0).getIn().getBody());
- assertEquals(message,
getMockEndpoint("mock:multipliedConsumer").getExchanges().get(0).getIn().getBody());
- }
-
- @Test
- public void shouldFilterForwardedMessages() throws InterruptedException {
- // Given
- MessageWrapper wrappedMessage = new MessageWrapper("message");
-
- // When
- eventBus.post(wrappedMessage);
- eventBus.post("String message.");
-
- // Then
- getMockEndpoint("mock:wrapperEvents").setExpectedMessageCount(1);
+ getMockEndpoint("mock:deadEvents").setExpectedMessageCount(1);
assertMockEndpointsSatisfied();
- assertEquals(wrappedMessage,
getMockEndpoint("mock:wrapperEvents").getExchanges().get(0).getIn().getBody());
+ assertEquals(message,
getMockEndpoint("mock:deadEvents").getExchanges().get(0).getIn().getBody(DeadEvent.class).getEvent());
}
}