This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 6ece3f8 Changes in HazelcastAggregationRepository to use replicated
maps (#3841)
6ece3f8 is described below
commit 6ece3f804e5f3694d12df3b71ad225f3e34bfe0a
Author: Marco Longobardi <[email protected]>
AuthorDate: Wed May 20 14:19:55 2020 +0200
Changes in HazelcastAggregationRepository to use replicated maps (#3841)
* Create ReplicatedHazelcastAggregationRepository.java
Changes to use replicated maps
* Update HazelcastAggregationRepository.java
Changes to use replicated maps
---
.../hazelcast/HazelcastAggregationRepository.java | 22 +--
... ReplicatedHazelcastAggregationRepository.java} | 205 +++++----------------
2 files changed, 60 insertions(+), 167 deletions(-)
diff --git
a/components/camel-hazelcast/src/main/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepository.java
b/components/camel-hazelcast/src/main/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepository.java
index 58fde58..a3409a2 100644
---
a/components/camel-hazelcast/src/main/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepository.java
+++
b/components/camel-hazelcast/src/main/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepository.java
@@ -57,20 +57,20 @@ public class HazelcastAggregationRepository extends
ServiceSupport
implements
RecoverableAggregationRepository,
OptimisticLockingAggregationRepository {
private static final Logger LOG =
LoggerFactory.getLogger(HazelcastAggregationRepository.class.getName());
- private static final String COMPLETED_SUFFIX = "-completed";
+ protected static final String COMPLETED_SUFFIX = "-completed";
- private boolean optimistic;
- private boolean useLocalHzInstance;
- private boolean useRecovery = true;
+ protected boolean optimistic;
+ protected boolean useLocalHzInstance;
+ protected boolean useRecovery = true;
private IMap<String, DefaultExchangeHolder> cache;
private IMap<String, DefaultExchangeHolder> persistedCache;
- private HazelcastInstance hzInstance;
- private String mapName;
- private String persistenceMapName;
- private String deadLetterChannel;
- private long recoveryInterval = 5000;
- private int maximumRedeliveries = 3;
- private boolean allowSerializedHeaders;
+ protected HazelcastInstance hzInstance;
+ protected String mapName;
+ protected String persistenceMapName;
+ protected String deadLetterChannel;
+ protected long recoveryInterval = 5000;
+ protected int maximumRedeliveries = 3;
+ protected boolean allowSerializedHeaders;
/**
* Creates new {@link HazelcastAggregationRepository} that defaults to
non-optimistic locking
diff --git
a/components/camel-hazelcast/src/main/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepository.java
b/components/camel-hazelcast/src/main/java/org/apache/camel/processor/aggregate/hazelcast/ReplicatedHazelcastAggregationRepository.java
similarity index 65%
copy from
components/camel-hazelcast/src/main/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepository.java
copy to
components/camel-hazelcast/src/main/java/org/apache/camel/processor/aggregate/hazelcast/ReplicatedHazelcastAggregationRepository.java
index 58fde58..21cec1a 100644
---
a/components/camel-hazelcast/src/main/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepository.java
+++
b/components/camel-hazelcast/src/main/java/org/apache/camel/processor/aggregate/hazelcast/ReplicatedHazelcastAggregationRepository.java
@@ -18,6 +18,7 @@ package org.apache.camel.processor.aggregate.hazelcast;
import java.util.Collections;
import java.util.Set;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
@@ -46,132 +47,100 @@ import org.slf4j.LoggerFactory;
* {@link RecoverableAggregationRepository} and {@link
OptimisticLockingAggregationRepository}.
* Defaults to thread-safe (non-optimistic) locking and recoverable strategy.
* Hazelcast settings are given to an end-user and can be controlled with
repositoryName and persistentRespositoryName,
- * both are {@link com.hazelcast.map.IMap} <String, Exchange>. However
HazelcastAggregationRepository
+ * both are {@link com.hazelcast.map.IMap} <String, Exchange>. However
ReplicatedHazelcastAggregationRepository
* can run it's own Hazelcast instance, but obviously no benefits of Hazelcast
clustering are gained this way.
- * If the {@link HazelcastAggregationRepository} uses it's own local {@link
HazelcastInstance} it will DESTROY this
+ * If the {@link ReplicatedHazelcastAggregationRepository} uses it's own local
{@link HazelcastInstance} it will DESTROY this
* instance on {@link #doStop()}. You should control {@link HazelcastInstance}
lifecycle yourself whenever you instantiate
- * {@link HazelcastAggregationRepository} passing a reference to the instance.
+ * {@link ReplicatedHazelcastAggregationRepository} passing a reference to the
instance.
*
*/
-public class HazelcastAggregationRepository extends ServiceSupport
- implements
RecoverableAggregationRepository,
-
OptimisticLockingAggregationRepository {
- private static final Logger LOG =
LoggerFactory.getLogger(HazelcastAggregationRepository.class.getName());
- private static final String COMPLETED_SUFFIX = "-completed";
-
- private boolean optimistic;
- private boolean useLocalHzInstance;
- private boolean useRecovery = true;
- private IMap<String, DefaultExchangeHolder> cache;
- private IMap<String, DefaultExchangeHolder> persistedCache;
- private HazelcastInstance hzInstance;
- private String mapName;
- private String persistenceMapName;
- private String deadLetterChannel;
- private long recoveryInterval = 5000;
- private int maximumRedeliveries = 3;
- private boolean allowSerializedHeaders;
+public class ReplicatedHazelcastAggregationRepository extends
HazelcastAggregationRepository {
+ private static final Logger LOG =
LoggerFactory.getLogger(ReplicatedHazelcastAggregationRepository.class.getName());
+ protected Map<String, DefaultExchangeHolder> replicatedCache;
+ protected Map<String, DefaultExchangeHolder> replicatedPersistedCache;
/**
- * Creates new {@link HazelcastAggregationRepository} that defaults to
non-optimistic locking
+ * Creates new {@link ReplicatedHazelcastAggregationRepository} that
defaults to non-optimistic locking
* with recoverable behavior and a local Hazelcast instance. Recoverable
repository name defaults to
* {@code repositoryName} + "-compeleted".
* @param repositoryName {@link IMap} repository name;
*/
- public HazelcastAggregationRepository(final String repositoryName) {
- mapName = repositoryName;
- persistenceMapName = String.format("%s%s", mapName, COMPLETED_SUFFIX);
- optimistic = false;
- useLocalHzInstance = true;
+ public ReplicatedHazelcastAggregationRepository(final String
repositoryName) {
+ super(repositoryName);
}
/**
- * Creates new {@link HazelcastAggregationRepository} that defaults to
non-optimistic locking
+ * Creates new {@link ReplicatedHazelcastAggregationRepository} that
defaults to non-optimistic locking
* with recoverable behavior and a local Hazelcast instance.
* @param repositoryName {@link IMap} repository name;
* @param persistentRepositoryName {@link IMap} recoverable repository
name;
*/
- public HazelcastAggregationRepository(final String repositoryName, final
String persistentRepositoryName) {
- mapName = repositoryName;
- persistenceMapName = persistentRepositoryName;
- optimistic = false;
- useLocalHzInstance = true;
+ public ReplicatedHazelcastAggregationRepository(final String
repositoryName, final String persistentRepositoryName) {
+ super(repositoryName,persistentRepositoryName);
}
/**
- * Creates new {@link HazelcastAggregationRepository} with recoverable
behavior and a local Hazelcast instance.
+ * Creates new {@link ReplicatedHazelcastAggregationRepository} with
recoverable behavior and a local Hazelcast instance.
* Recoverable repository name defaults to {@code repositoryName} +
"-compeleted".
* @param repositoryName {@link IMap} repository name;
* @param optimistic whether to use optimistic locking manner.
*/
- public HazelcastAggregationRepository(final String repositoryName, boolean
optimistic) {
- this(repositoryName);
- this.optimistic = optimistic;
- useLocalHzInstance = true;
+ public ReplicatedHazelcastAggregationRepository(final String
repositoryName, boolean optimistic) {
+ super(repositoryName,optimistic);
}
/**
- * Creates new {@link HazelcastAggregationRepository} with recoverable
behavior and a local Hazelcast instance.
+ * Creates new {@link ReplicatedHazelcastAggregationRepository} with
recoverable behavior and a local Hazelcast instance.
* @param repositoryName {@link IMap} repository name;
* @param persistentRepositoryName {@link IMap} recoverable repository
name;
* @param optimistic whether to use optimistic locking manner.
*/
- public HazelcastAggregationRepository(final String repositoryName, final
String persistentRepositoryName, boolean optimistic) {
- this(repositoryName, persistentRepositoryName);
- this.optimistic = optimistic;
- useLocalHzInstance = true;
+ public ReplicatedHazelcastAggregationRepository(final String
repositoryName, final String persistentRepositoryName, boolean optimistic) {
+ super(repositoryName,persistentRepositoryName,optimistic);
}
/**
- * Creates new {@link HazelcastAggregationRepository} that defaults to
non-optimistic locking
+ * Creates new {@link ReplicatedHazelcastAggregationRepository} that
defaults to non-optimistic locking
* with recoverable behavior. Recoverable repository name defaults to
* {@code repositoryName} + "-compeleted".
* @param repositoryName {@link IMap} repository name;
* @param hzInstanse externally configured {@link HazelcastInstance}.
*/
- public HazelcastAggregationRepository(final String repositoryName,
HazelcastInstance hzInstanse) {
- this (repositoryName, false);
- this.hzInstance = hzInstanse;
- useLocalHzInstance = false;
+ public ReplicatedHazelcastAggregationRepository(final String
repositoryName, HazelcastInstance hzInstanse) {
+ super(repositoryName,hzInstanse);
}
/**
- * Creates new {@link HazelcastAggregationRepository} that defaults to
non-optimistic locking
+ * Creates new {@link ReplicatedHazelcastAggregationRepository} that
defaults to non-optimistic locking
* with recoverable behavior.
* @param repositoryName {@link IMap} repository name;
* @param persistentRepositoryName {@link IMap} recoverable repository
name;
* @param hzInstanse externally configured {@link HazelcastInstance}.
*/
- public HazelcastAggregationRepository(final String repositoryName, final
String persistentRepositoryName, HazelcastInstance hzInstanse) {
- this (repositoryName, persistentRepositoryName, false);
- this.hzInstance = hzInstanse;
- useLocalHzInstance = false;
+ public ReplicatedHazelcastAggregationRepository(final String
repositoryName, final String persistentRepositoryName, HazelcastInstance
hzInstanse) {
+ super(repositoryName, persistentRepositoryName, hzInstanse);
}
/**
- * Creates new {@link HazelcastAggregationRepository} with recoverable
behavior.
+ * Creates new {@link ReplicatedHazelcastAggregationRepository} with
recoverable behavior.
* Recoverable repository name defaults to {@code repositoryName} +
"-compeleted".
* @param repositoryName {@link IMap} repository name;
* @param optimistic whether to use optimistic locking manner;
* @param hzInstance externally configured {@link HazelcastInstance}.
*/
- public HazelcastAggregationRepository(final String repositoryName, boolean
optimistic, HazelcastInstance hzInstance) {
- this(repositoryName, optimistic);
- this.hzInstance = hzInstance;
- useLocalHzInstance = false;
+ public ReplicatedHazelcastAggregationRepository(final String
repositoryName, boolean optimistic, HazelcastInstance hzInstance) {
+ super(repositoryName,optimistic,hzInstance);
}
/**
- * Creates new {@link HazelcastAggregationRepository} with recoverable
behavior.
+ * Creates new {@link ReplicatedHazelcastAggregationRepository} with
recoverable behavior.
* @param repositoryName {@link IMap} repository name;
* @param optimistic whether to use optimistic locking manner;
* @param persistentRepositoryName {@link IMap} recoverable repository
name;
* @param hzInstance externally configured {@link HazelcastInstance}.
*/
- public HazelcastAggregationRepository(final String repositoryName, final
String persistentRepositoryName, boolean optimistic, HazelcastInstance
hzInstance) {
- this(repositoryName, persistentRepositoryName, optimistic);
- this.hzInstance = hzInstance;
- useLocalHzInstance = false;
+ public ReplicatedHazelcastAggregationRepository(final String
repositoryName, final String persistentRepositoryName, boolean optimistic,
HazelcastInstance hzInstance) {
+ super(repositoryName,persistentRepositoryName,optimistic,hzInstance);
}
@Override
@@ -182,7 +151,7 @@ public class HazelcastAggregationRepository extends
ServiceSupport
LOG.trace("Adding an Exchange with ID {} for key {} in an optimistic
manner.", newExchange.getExchangeId(), key);
if (oldExchange == null) {
DefaultExchangeHolder holder =
DefaultExchangeHolder.marshal(newExchange, true, allowSerializedHeaders);
- final DefaultExchangeHolder misbehaviorHolder =
cache.putIfAbsent(key, holder);
+ final DefaultExchangeHolder misbehaviorHolder =
replicatedCache.putIfAbsent(key, holder);
if (misbehaviorHolder != null) {
Exchange misbehaviorEx = unmarshallExchange(camelContext,
misbehaviorHolder);
LOG.error("Optimistic locking failed for exchange with key {}:
IMap#putIfAbsend returned Exchange with ID {}, while it's expected no exchanges
to be returned",
@@ -192,7 +161,7 @@ public class HazelcastAggregationRepository extends
ServiceSupport
} else {
DefaultExchangeHolder oldHolder =
DefaultExchangeHolder.marshal(oldExchange, true, allowSerializedHeaders);
DefaultExchangeHolder newHolder =
DefaultExchangeHolder.marshal(newExchange, true, allowSerializedHeaders);
- if (!cache.replace(key, oldHolder, newHolder)) {
+ if (!replicatedCache.replace(key, oldHolder, newHolder)) {
LOG.error("Optimistic locking failed for exchange with key {}:
IMap#replace returned no Exchanges, while it's expected to replace one",
key);
throw new OptimisticLockingException();
@@ -212,7 +181,7 @@ public class HazelcastAggregationRepository extends
ServiceSupport
try {
l.lock();
DefaultExchangeHolder newHolder =
DefaultExchangeHolder.marshal(exchange, true, allowSerializedHeaders);
- DefaultExchangeHolder oldHolder = cache.put(key, newHolder);
+ DefaultExchangeHolder oldHolder = replicatedCache.put(key,
newHolder);
return unmarshallExchange(camelContext, oldHolder);
} finally {
LOG.trace("Added an Exchange with ID {} for key {} in a
thread-safe manner.", exchange.getExchangeId(), key);
@@ -224,7 +193,7 @@ public class HazelcastAggregationRepository extends
ServiceSupport
public Set<String> scan(CamelContext camelContext) {
if (useRecovery) {
LOG.trace("Scanning for exchanges to recover in {} context",
camelContext.getName());
- Set<String> scanned =
Collections.unmodifiableSet(persistedCache.keySet());
+ Set<String> scanned =
Collections.unmodifiableSet(replicatedPersistedCache.keySet());
LOG.trace("Found {} keys for exchanges to recover in {} context",
scanned.size(), camelContext.getName());
return scanned;
} else {
@@ -237,57 +206,12 @@ public class HazelcastAggregationRepository extends
ServiceSupport
@Override
public Exchange recover(CamelContext camelContext, String exchangeId) {
LOG.trace("Recovering an Exchange with ID {}.", exchangeId);
- return useRecovery ? unmarshallExchange(camelContext,
persistedCache.get(exchangeId)) : null;
- }
-
- @Override
- public void setRecoveryInterval(long interval, TimeUnit timeUnit) {
- this.recoveryInterval = timeUnit.toMillis(interval);
- }
-
- @Override
- public void setRecoveryInterval(long interval) {
- this.recoveryInterval = interval;
- }
-
- @Override
- public long getRecoveryIntervalInMillis() {
- return recoveryInterval;
- }
-
- @Override
- public void setUseRecovery(boolean useRecovery) {
- this.useRecovery = useRecovery;
- }
-
- @Override
- public boolean isUseRecovery() {
- return useRecovery;
- }
-
- @Override
- public void setDeadLetterUri(String deadLetterUri) {
- this.deadLetterChannel = deadLetterUri;
- }
-
- @Override
- public String getDeadLetterUri() {
- return deadLetterChannel;
- }
-
- @Override
- public void setMaximumRedeliveries(int maximumRedeliveries) {
- this.maximumRedeliveries = maximumRedeliveries;
- }
-
- @Override
- public int getMaximumRedeliveries() {
- return maximumRedeliveries;
+ return useRecovery ? unmarshallExchange(camelContext,
replicatedPersistedCache.get(exchangeId)) : null;
}
@Override
public Exchange get(CamelContext camelContext, String key) {
- return unmarshallExchange(camelContext, cache.get(key));
+ return unmarshallExchange(camelContext, replicatedCache.get(key));
}
/**
@@ -296,21 +220,13 @@ public class HazelcastAggregationRepository extends
ServiceSupport
* @param key Object - key in question
*/
public boolean containsKey(Object key) {
- if (cache != null) {
- return cache.containsKey(key);
+ if (replicatedCache != null) {
+ return replicatedCache.containsKey(key);
} else {
return false;
}
}
- public boolean isAllowSerializedHeaders() {
- return allowSerializedHeaders;
- }
-
- public void setAllowSerializedHeaders(boolean allowSerializedHeaders) {
- this.allowSerializedHeaders = allowSerializedHeaders;
- }
-
/**
* This method performs transactional operation on removing the {@code
exchange}
* from the operational storage and moving it into the persistent one if
the {@link HazelcastAggregationRepository}
@@ -324,7 +240,7 @@ public class HazelcastAggregationRepository extends
ServiceSupport
DefaultExchangeHolder holder = DefaultExchangeHolder.marshal(exchange,
true, allowSerializedHeaders);
if (optimistic) {
LOG.trace("Removing an exchange with ID {} for key {} in an
optimistic manner.", exchange.getExchangeId(), key);
- if (!cache.remove(key, holder)) {
+ if (!replicatedCache.remove(key, holder)) {
LOG.error("Optimistic locking failed for exchange with key {}:
IMap#remove removed no Exchanges, while it's expected to remove one.",
key);
throw new OptimisticLockingException();
@@ -333,7 +249,7 @@ public class HazelcastAggregationRepository extends
ServiceSupport
if (useRecovery) {
LOG.trace("Putting an exchange with ID {} for key {} into a
recoverable storage in an optimistic manner.",
exchange.getExchangeId(), key);
- persistedCache.put(exchange.getExchangeId(), holder);
+ replicatedPersistedCache.put(exchange.getExchangeId(), holder);
LOG.trace("Put an exchange with ID {} for key {} into a
recoverable storage in an optimistic manner.",
exchange.getExchangeId(), key);
}
@@ -351,8 +267,8 @@ public class HazelcastAggregationRepository extends
ServiceSupport
try {
tCtx.beginTransaction();
- TransactionalMap<String, DefaultExchangeHolder> tCache =
tCtx.getMap(cache.getName());
- TransactionalMap<String, DefaultExchangeHolder>
tPersistentCache = tCtx.getMap(persistedCache.getName());
+ TransactionalMap<String, DefaultExchangeHolder> tCache =
tCtx.getMap(mapName);
+ TransactionalMap<String, DefaultExchangeHolder>
tPersistentCache = tCtx.getMap(persistenceMapName);
DefaultExchangeHolder removedHolder = tCache.remove(key);
LOG.trace("Putting an exchange with ID {} for key {} into
a recoverable storage in a thread-safe manner.",
@@ -372,7 +288,7 @@ public class HazelcastAggregationRepository extends
ServiceSupport
throw new RuntimeException(msg, throwable);
}
} else {
- cache.remove(key);
+ replicatedCache.remove(key);
}
}
}
@@ -381,20 +297,13 @@ public class HazelcastAggregationRepository extends
ServiceSupport
public void confirm(CamelContext camelContext, String exchangeId) {
LOG.trace("Confirming an exchange with ID {}.", exchangeId);
if (useRecovery) {
- persistedCache.remove(exchangeId);
+ replicatedPersistedCache.remove(exchangeId);
}
}
@Override
public Set<String> getKeys() {
- return Collections.unmodifiableSet(cache.keySet());
- }
-
- /**
- * @return Persistent repository {@link IMap} name;
- */
- public String getPersistentRepositoryName() {
- return persistenceMapName;
+ return Collections.unmodifiableSet(replicatedCache.keySet());
}
@Override
@@ -413,26 +322,10 @@ public class HazelcastAggregationRepository extends
ServiceSupport
} else {
ObjectHelper.notNull(hzInstance, "hzInstanse");
}
- cache = hzInstance.getMap(mapName);
+ replicatedCache = hzInstance.getReplicatedMap(mapName);
if (useRecovery) {
- persistedCache = hzInstance.getMap(persistenceMapName);
- }
- }
-
- @Override
- protected void doStop() throws Exception {
- //noop
- if (useLocalHzInstance) {
- hzInstance.getLifecycleService().shutdown();
+ replicatedPersistedCache =
hzInstance.getReplicatedMap(persistenceMapName);
}
}
- protected Exchange unmarshallExchange(CamelContext camelContext,
DefaultExchangeHolder holder) {
- Exchange exchange = null;
- if (holder != null) {
- exchange = new DefaultExchange(camelContext);
- DefaultExchangeHolder.unmarshal(exchange, holder);
- }
- return exchange;
- }
}