This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new f3ade6d CAMEL-16018: HazelcastReplicatedConsumer not receiving events
(#4868)
f3ade6d is described below
commit f3ade6db2f9cd4f6f23371848fe29c35774fd835
Author: Zineb BENDHIBA <[email protected]>
AuthorDate: Thu Jan 14 14:04:49 2021 +0100
CAMEL-16018: HazelcastReplicatedConsumer not receiving events (#4868)
---
.../hazelcast/HazelcastComponentHelper.java | 4 +-
.../HazelcastAtomicnumberProducer.java | 10 ++--
.../hazelcast/list/HazelcastListProducer.java | 2 +-
.../hazelcast/map/HazelcastMapProducer.java | 16 +++---
.../multimap/HazelcastMultimapProducer.java | 8 +--
.../hazelcast/queue/HazelcastQueueProducer.java | 14 ++---
.../HazelcastReplicatedmapConsumer.java | 2 +-
.../ringbuffer/HazelcastRingbufferProducer.java | 14 ++---
.../hazelcast/HazelcastCamelTestSupport.java | 2 +-
.../HazelcastReplicatedmapConsumerTest.java | 64 +++++++++-------------
10 files changed, 63 insertions(+), 73 deletions(-)
diff --git
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastComponentHelper.java
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastComponentHelper.java
index e3139a9..4683c61 100644
---
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastComponentHelper.java
+++
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastComponentHelper.java
@@ -41,8 +41,8 @@ public final class HazelcastComponentHelper {
}
// propagate headers if OUT message created
- if (ex.hasOut()) {
- ex.getOut().setHeaders(headers);
+ if (ex.getMessage() != null) {
+ ex.getMessage().setHeaders(headers);
}
}
diff --git
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/atomicnumber/HazelcastAtomicnumberProducer.java
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/atomicnumber/HazelcastAtomicnumberProducer.java
index bea9f87..5761f39 100644
---
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/atomicnumber/HazelcastAtomicnumberProducer.java
+++
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/atomicnumber/HazelcastAtomicnumberProducer.java
@@ -92,15 +92,15 @@ public class HazelcastAtomicnumberProducer extends
HazelcastDefaultProducer {
}
private void get(Exchange exchange) {
- exchange.getOut().setBody(this.atomicnumber.get());
+ exchange.getMessage().setBody(this.atomicnumber.get());
}
private void increment(Exchange exchange) {
- exchange.getOut().setBody(this.atomicnumber.incrementAndGet());
+ exchange.getMessage().setBody(this.atomicnumber.incrementAndGet());
}
private void decrement(Exchange exchange) {
- exchange.getOut().setBody(this.atomicnumber.decrementAndGet());
+ exchange.getMessage().setBody(this.atomicnumber.decrementAndGet());
}
private void compare(long expected, Exchange exchange) {
@@ -108,12 +108,12 @@ public class HazelcastAtomicnumberProducer extends
HazelcastDefaultProducer {
if (ObjectHelper.isEmpty(expected)) {
throw new IllegalArgumentException("Expected value must be
specified");
}
- exchange.getOut().setBody(this.atomicnumber.compareAndSet(expected,
update));
+
exchange.getMessage().setBody(this.atomicnumber.compareAndSet(expected,
update));
}
private void getAndAdd(Exchange exchange) {
long delta = exchange.getIn().getBody(Long.class);
- exchange.getOut().setBody(this.atomicnumber.getAndAdd(delta));
+ exchange.getMessage().setBody(this.atomicnumber.getAndAdd(delta));
}
private void set(Exchange exchange) {
diff --git
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/list/HazelcastListProducer.java
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/list/HazelcastListProducer.java
index db57ab6..8044083 100644
---
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/list/HazelcastListProducer.java
+++
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/list/HazelcastListProducer.java
@@ -114,7 +114,7 @@ public class HazelcastListProducer extends
HazelcastDefaultProducer {
}
private void get(Integer pos, Exchange exchange) {
- exchange.getOut().setBody(this.list.get(pos));
+ exchange.getMessage().setBody(this.list.get(pos));
}
private void set(Integer pos, Exchange exchange) {
diff --git
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/map/HazelcastMapProducer.java
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/map/HazelcastMapProducer.java
index 6ad84f9..a54246a 100644
---
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/map/HazelcastMapProducer.java
+++
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/map/HazelcastMapProducer.java
@@ -128,7 +128,7 @@ public class HazelcastMapProducer extends
HazelcastDefaultProducer {
break;
case CLEAR:
- this.clear(exchange);
+ this.clear();
break;
case EVICT:
@@ -159,7 +159,7 @@ public class HazelcastMapProducer extends
HazelcastDefaultProducer {
} else {
result = this.cache.values();
}
- exchange.getOut().setBody(result);
+ exchange.getMessage().setBody(result);
}
/**
@@ -193,14 +193,14 @@ public class HazelcastMapProducer extends
HazelcastDefaultProducer {
* find an object by the given id and give it back
*/
private void get(Object oid, Exchange exchange) {
- exchange.getOut().setBody(this.cache.get(oid));
+ exchange.getMessage().setBody(this.cache.get(oid));
}
/**
* GET All objects and give it back
*/
private void getAll(Object oid, Exchange exchange) {
- exchange.getOut().setBody(this.cache.getAll((Set<Object>) oid));
+ exchange.getMessage().setBody(this.cache.getAll((Set<Object>) oid));
}
/**
@@ -239,7 +239,7 @@ public class HazelcastMapProducer extends
HazelcastDefaultProducer {
/**
* Clear all the entries
*/
- private void clear(Exchange exchange) {
+ private void clear() {
this.cache.clear();
}
@@ -261,7 +261,7 @@ public class HazelcastMapProducer extends
HazelcastDefaultProducer {
* Check for a specific key in the cache and return true if it exists or
false otherwise
*/
private void containsKey(Object oid, Exchange exchange) {
- exchange.getOut().setBody(this.cache.containsKey(oid));
+ exchange.getMessage().setBody(this.cache.containsKey(oid));
}
/**
@@ -269,13 +269,13 @@ public class HazelcastMapProducer extends
HazelcastDefaultProducer {
*/
private void containsValue(Exchange exchange) {
Object body = exchange.getIn().getBody();
- exchange.getOut().setBody(this.cache.containsValue(body));
+ exchange.getMessage().setBody(this.cache.containsValue(body));
}
/**
* GET keys set of objects and give it back
*/
private void getKeys(Exchange exchange) {
- exchange.getOut().setBody(this.cache.keySet());
+ exchange.getMessage().setBody(this.cache.keySet());
}
}
diff --git
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/multimap/HazelcastMultimapProducer.java
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/multimap/HazelcastMultimapProducer.java
index f736591..386dc51 100644
---
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/multimap/HazelcastMultimapProducer.java
+++
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/multimap/HazelcastMultimapProducer.java
@@ -99,7 +99,7 @@ public class HazelcastMultimapProducer extends
HazelcastDefaultProducer {
}
private void get(Object oid, Exchange exchange) {
- exchange.getOut().setBody(this.cache.get(oid));
+ exchange.getMessage().setBody(this.cache.get(oid));
}
private void delete(Object oid) {
@@ -111,7 +111,7 @@ public class HazelcastMultimapProducer extends
HazelcastDefaultProducer {
}
private void valuecount(Object oid, Exchange exchange) {
- exchange.getOut().setBody(this.cache.valueCount(oid));
+ exchange.getMessage().setBody(this.cache.valueCount(oid));
}
private void clear(Exchange exchange) {
@@ -119,11 +119,11 @@ public class HazelcastMultimapProducer extends
HazelcastDefaultProducer {
}
private void containsKey(Object oid, Exchange exchange) {
- exchange.getOut().setBody(this.cache.containsKey(oid));
+ exchange.getMessage().setBody(this.cache.containsKey(oid));
}
private void containsValue(Exchange exchange) {
Object body = exchange.getIn().getBody();
- exchange.getOut().setBody(this.cache.containsValue(body));
+ exchange.getMessage().setBody(this.cache.containsValue(body));
}
}
diff --git
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueProducer.java
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueProducer.java
index 2e86417..3113acf 100644
---
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueProducer.java
+++
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueProducer.java
@@ -127,11 +127,11 @@ public class HazelcastQueueProducer extends
HazelcastDefaultProducer {
}
private void poll(Exchange exchange) {
- exchange.getOut().setBody(this.queue.poll());
+ exchange.getMessage().setBody(this.queue.poll());
}
private void peek(Exchange exchange) {
- exchange.getOut().setBody(this.queue.peek());
+ exchange.getMessage().setBody(this.queue.peek());
}
private void offer(Exchange exchange) {
@@ -149,12 +149,12 @@ public class HazelcastQueueProducer extends
HazelcastDefaultProducer {
}
private void remainingCapacity(Exchange exchange) {
- exchange.getOut().setBody(this.queue.remainingCapacity());
+ exchange.getMessage().setBody(this.queue.remainingCapacity());
}
private void drainTo(Collection c, Exchange exchange) {
- exchange.getOut().setBody(this.queue.drainTo(c));
- exchange.getOut().setHeader(HazelcastConstants.DRAIN_TO_COLLECTION, c);
+ exchange.getMessage().setBody(this.queue.drainTo(c));
+
exchange.getMessage().setHeader(HazelcastConstants.DRAIN_TO_COLLECTION, c);
}
private void removeAll(Exchange exchange) {
@@ -164,11 +164,11 @@ public class HazelcastQueueProducer extends
HazelcastDefaultProducer {
private void removeIf(Exchange exchange) {
Predicate filter = exchange.getIn().getBody(Predicate.class);
- exchange.getOut().setBody(this.queue.removeIf(filter));
+ exchange.getMessage().setBody(this.queue.removeIf(filter));
}
private void take(Exchange exchange) throws InterruptedException {
- exchange.getOut().setBody(this.queue.take());
+ exchange.getMessage().setBody(this.queue.take());
}
private void retainAll(Exchange exchange) {
diff --git
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/replicatedmap/HazelcastReplicatedmapConsumer.java
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/replicatedmap/HazelcastReplicatedmapConsumer.java
index 796d2a5..2a215fd 100644
---
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/replicatedmap/HazelcastReplicatedmapConsumer.java
+++
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/replicatedmap/HazelcastReplicatedmapConsumer.java
@@ -45,7 +45,7 @@ public class HazelcastReplicatedmapConsumer extends
HazelcastDefaultConsumer {
protected void doStart() throws Exception {
super.doStart();
- listener = cache.addEntryListener(new CamelEntryListener(this,
cacheName), true);
+ listener = cache.addEntryListener(new CamelEntryListener(this,
cacheName));
}
/**
diff --git
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/ringbuffer/HazelcastRingbufferProducer.java
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/ringbuffer/HazelcastRingbufferProducer.java
index e649f31..528d873 100644
---
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/ringbuffer/HazelcastRingbufferProducer.java
+++
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/ringbuffer/HazelcastRingbufferProducer.java
@@ -73,23 +73,23 @@ public class HazelcastRingbufferProducer extends
HazelcastDefaultProducer {
}
private void readOnceHead(Exchange exchange) throws InterruptedException {
-
exchange.getOut().setBody(this.ringbuffer.readOne(ringbuffer.headSequence()));
+
exchange.getMessage().setBody(this.ringbuffer.readOne(ringbuffer.headSequence()));
}
private void readOnceTail(Exchange exchange) throws InterruptedException {
-
exchange.getOut().setBody(this.ringbuffer.readOne(ringbuffer.tailSequence()));
+
exchange.getMessage().setBody(this.ringbuffer.readOne(ringbuffer.tailSequence()));
}
- private void getCapacity(Exchange exchange) throws InterruptedException {
- exchange.getOut().setBody(this.ringbuffer.capacity());
+ private void getCapacity(Exchange exchange) {
+ exchange.getMessage().setBody(this.ringbuffer.capacity());
}
- private void getRemainingCapacity(Exchange exchange) throws
InterruptedException {
- exchange.getOut().setBody(this.ringbuffer.remainingCapacity());
+ private void getRemainingCapacity(Exchange exchange) {
+ exchange.getMessage().setBody(this.ringbuffer.remainingCapacity());
}
private void add(Exchange exchange) {
final Object body = exchange.getIn().getBody();
- exchange.getOut().setBody(ringbuffer.add(body));
+ exchange.getMessage().setBody(ringbuffer.add(body));
}
}
diff --git
a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastCamelTestSupport.java
b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastCamelTestSupport.java
index aed1630..c3b1df7 100644
---
a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastCamelTestSupport.java
+++
b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastCamelTestSupport.java
@@ -32,7 +32,7 @@ public class HazelcastCamelTestSupport extends
CamelTestSupport {
@Override
protected CamelContext createCamelContext() throws Exception {
- MockitoAnnotations.initMocks(this);
+ MockitoAnnotations.openMocks(this);
CamelContext context = super.createCamelContext();
HazelcastCamelTestHelper.registerHazelcastComponents(context,
hazelcastInstance);
trainHazelcastInstance(hazelcastInstance);
diff --git
a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastReplicatedmapConsumerTest.java
b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastReplicatedmapConsumerTest.java
index 74c854d..6a699ab 100644
---
a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastReplicatedmapConsumerTest.java
+++
b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastReplicatedmapConsumerTest.java
@@ -17,47 +17,45 @@
package org.apache.camel.component.hazelcast;
import java.util.Map;
-import java.util.UUID;
import java.util.concurrent.TimeUnit;
-import com.hazelcast.core.EntryEvent;
-import com.hazelcast.core.EntryEventType;
-import com.hazelcast.core.EntryListener;
+import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.replicatedmap.ReplicatedMap;
+import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Captor;
-import org.mockito.Mock;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-public class HazelcastReplicatedmapConsumerTest extends
HazelcastCamelTestSupport {
+public class HazelcastReplicatedmapConsumerTest extends CamelTestSupport {
- @Mock
+ private HazelcastInstance hazelcastInstance;
private ReplicatedMap<Object, Object> map;
- @Captor
- private ArgumentCaptor<EntryListener<Object, Object>> argument;
+ @BeforeEach
+ public void beforeEach() {
+ hazelcastInstance = Hazelcast.newHazelcastInstance();
+ map = hazelcastInstance.getReplicatedMap("rm");
+ }
- @Override
- protected void trainHazelcastInstance(HazelcastInstance hazelcastInstance)
{
- when(hazelcastInstance.getReplicatedMap("rm")).thenReturn(map);
- when(map.addEntryListener(any(),
eq(true))).thenReturn(UUID.randomUUID());
+ @AfterEach
+ public void afterEach() {
+ if (hazelcastInstance != null) {
+ hazelcastInstance.shutdown();
+ }
}
@Override
- @SuppressWarnings("unchecked")
- protected void verifyHazelcastInstance(HazelcastInstance
hazelcastInstance) {
- verify(hazelcastInstance).getReplicatedMap("rm");
- verify(map).addEntryListener(any(EntryListener.class), eq(true));
+ protected CamelContext createCamelContext() throws Exception {
+ CamelContext context = super.createCamelContext();
+ HazelcastCamelTestHelper.registerHazelcastComponents(context,
hazelcastInstance);
+ return context;
}
@Test
@@ -65,10 +63,7 @@ public class HazelcastReplicatedmapConsumerTest extends
HazelcastCamelTestSuppor
MockEndpoint out = getMockEndpoint("mock:added");
out.expectedMessageCount(1);
- verify(map).addEntryListener(argument.capture(), eq(true));
- EntryEvent<Object, Object> event = new EntryEvent<>("foo", null,
EntryEventType.ADDED.getType(), "4711", "my-foo");
- argument.getValue().entryAdded(event);
-
+ map.put("4711", "my-foo");
assertMockEndpointsSatisfied(5000, TimeUnit.MILLISECONDS);
this.checkHeaders(out.getExchanges().get(0).getIn().getHeaders(),
HazelcastConstants.ADDED);
@@ -81,11 +76,8 @@ public class HazelcastReplicatedmapConsumerTest extends
HazelcastCamelTestSuppor
public void testEvict() throws InterruptedException {
MockEndpoint out = getMockEndpoint("mock:evicted");
out.expectedMessageCount(1);
-
- verify(map).addEntryListener(argument.capture(), eq(true));
- EntryEvent<Object, Object> event = new EntryEvent<>("foo", null,
EntryEventType.EVICTED.getType(), "4711", "my-foo");
- argument.getValue().entryEvicted(event);
-
+ map.put("4711", "my-foo", 100, TimeUnit.MILLISECONDS);
+ Thread.sleep(150);
assertMockEndpointsSatisfied(30000, TimeUnit.MILLISECONDS);
}
@@ -93,11 +85,8 @@ public class HazelcastReplicatedmapConsumerTest extends
HazelcastCamelTestSuppor
public void testRemove() throws InterruptedException {
MockEndpoint out = getMockEndpoint("mock:removed");
out.expectedMessageCount(1);
-
- verify(map).addEntryListener(argument.capture(), eq(true));
- EntryEvent<Object, Object> event = new EntryEvent<>("foo", null,
EntryEventType.REMOVED.getType(), "4711", "my-foo");
- argument.getValue().entryRemoved(event);
-
+ map.put("4711", "my-foo");
+ map.remove("4711");
assertMockEndpointsSatisfied(5000, TimeUnit.MILLISECONDS);
this.checkHeaders(out.getExchanges().get(0).getIn().getHeaders(),
HazelcastConstants.REMOVED);
}
@@ -124,4 +113,5 @@ public class HazelcastReplicatedmapConsumerTest extends
HazelcastCamelTestSuppor
assertEquals("4711", headers.get(HazelcastConstants.OBJECT_ID));
assertNotNull(headers.get(HazelcastConstants.LISTENER_TIME));
}
+
}