This is an automated email from the ASF dual-hosted git repository. Croway pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit e6bf966eed8585ce133c4f1e0973913b7f35e76c Author: Croway <[email protected]> AuthorDate: Mon Jun 8 17:57:54 2026 +0200 CAMEL-23714: Polish RedisAggregationRepository and add tests --- .../org/apache/camel/catalog/beans.properties | 1 + .../catalog/beans/RedisAggregationRepository.json | 16 ++ .../RedisAggregationRepositoryConfigurer.java | 96 ++++++++ .../services/org/apache/camel/bean.properties | 7 + .../camel/bean/RedisAggregationRepository.json | 16 ++ ....processor.aggregate.RedisAggregationRepository | 2 + .../aggregate/RedisAggregationRepository.java | 21 +- .../RedisAggregationRepositoryOperationsIT.java | 274 +++++++++++++++++++++ 8 files changed, 427 insertions(+), 6 deletions(-) diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/beans.properties b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/beans.properties index 83ad5f38ee54..374208cf7ff6 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/beans.properties +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/beans.properties @@ -37,6 +37,7 @@ MemoryAggregationRepository MemoryIdempotentRepository MongoDbIdempotentRepository OpensearchBulkRequestAggregationStrategy +RedisAggregationRepository SimpleScheduledRoutePolicy SpringCacheIdempotentRepository SpringRedisIdempotentRepository diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/beans/RedisAggregationRepository.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/beans/RedisAggregationRepository.json new file mode 100644 index 000000000000..2dc2c896f3f7 --- /dev/null +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/beans/RedisAggregationRepository.json @@ -0,0 +1,16 @@ +{ + "bean": { + "kind": "bean", + "name": "RedisAggregationRepository", + "javaType": "org.apache.camel.component.redis.processor.aggregate.RedisAggregationRepository", + "interfaceType": "org.apache.camel.spi.AggregationRepository", + "title": "Redis Aggregation Repository", + "description": "Aggregation repository that uses Redis to store exchanges.", + "deprecated": false, + "groupId": "org.apache.camel", + "artifactId": "camel-redis", + "version": "4.21.0-SNAPSHOT", + "properties": { "redisson": { "index": 0, "kind": "property", "displayName": "Redisson", "label": "advanced", "required": false, "type": "object", "javaType": "org.redisson.api.RedissonClient", "deprecated": false, "autowired": false, "secret": false, "description": "To use an existing Redis client to connect to Redis server" }, "endpoint": { "index": 1, "kind": "property", "displayName": "Endpoint", "required": true, "type": "string", "javaType": "java.lang.String", "deprecated": fa [...] + } +} + diff --git a/components/camel-redis/src/generated/java/org/apache/camel/component/redis/processor/aggregate/RedisAggregationRepositoryConfigurer.java b/components/camel-redis/src/generated/java/org/apache/camel/component/redis/processor/aggregate/RedisAggregationRepositoryConfigurer.java new file mode 100644 index 000000000000..b696d8441c05 --- /dev/null +++ b/components/camel-redis/src/generated/java/org/apache/camel/component/redis/processor/aggregate/RedisAggregationRepositoryConfigurer.java @@ -0,0 +1,96 @@ +/* Generated by camel build tools - do NOT edit this file! */ +package org.apache.camel.component.redis.processor.aggregate; + +import javax.annotation.processing.Generated; +import java.util.Map; + +import org.apache.camel.CamelContext; +import org.apache.camel.spi.ExtendedPropertyConfigurerGetter; +import org.apache.camel.spi.PropertyConfigurerGetter; +import org.apache.camel.spi.ConfigurerStrategy; +import org.apache.camel.spi.GeneratedPropertyConfigurer; +import org.apache.camel.util.CaseInsensitiveMap; +import org.apache.camel.component.redis.processor.aggregate.RedisAggregationRepository; + +/** + * Generated by camel build tools - do NOT edit this file! + */ +@Generated("org.apache.camel.maven.packaging.GenerateConfigurerMojo") +@SuppressWarnings("unchecked") +public class RedisAggregationRepositoryConfigurer extends org.apache.camel.support.component.PropertyConfigurerSupport implements GeneratedPropertyConfigurer, PropertyConfigurerGetter { + + @Override + public boolean configure(CamelContext camelContext, Object obj, String name, Object value, boolean ignoreCase) { + org.apache.camel.component.redis.processor.aggregate.RedisAggregationRepository target = (org.apache.camel.component.redis.processor.aggregate.RedisAggregationRepository) obj; + switch (ignoreCase ? name.toLowerCase() : name) { + case "allowserializedheaders": + case "allowSerializedHeaders": target.setAllowSerializedHeaders(property(camelContext, boolean.class, value)); return true; + case "deadletteruri": + case "deadLetterUri": target.setDeadLetterUri(property(camelContext, java.lang.String.class, value)); return true; + case "endpoint": target.setEndpoint(property(camelContext, java.lang.String.class, value)); return true; + case "mapname": + case "mapName": target.setMapName(property(camelContext, java.lang.String.class, value)); return true; + case "maximumredeliveries": + case "maximumRedeliveries": target.setMaximumRedeliveries(property(camelContext, int.class, value)); return true; + case "optimistic": target.setOptimistic(property(camelContext, boolean.class, value)); return true; + case "persistencemapname": + case "persistenceMapName": target.setPersistenceMapName(property(camelContext, java.lang.String.class, value)); return true; + case "recoveryinterval": + case "recoveryInterval": target.setRecoveryInterval(property(camelContext, long.class, value)); return true; + case "redisson": target.setRedisson(property(camelContext, org.redisson.api.RedissonClient.class, value)); return true; + case "userecovery": + case "useRecovery": target.setUseRecovery(property(camelContext, boolean.class, value)); return true; + default: return false; + } + } + + @Override + public Class<?> getOptionType(String name, boolean ignoreCase) { + switch (ignoreCase ? name.toLowerCase() : name) { + case "allowserializedheaders": + case "allowSerializedHeaders": return boolean.class; + case "deadletteruri": + case "deadLetterUri": return java.lang.String.class; + case "endpoint": return java.lang.String.class; + case "mapname": + case "mapName": return java.lang.String.class; + case "maximumredeliveries": + case "maximumRedeliveries": return int.class; + case "optimistic": return boolean.class; + case "persistencemapname": + case "persistenceMapName": return java.lang.String.class; + case "recoveryinterval": + case "recoveryInterval": return long.class; + case "redisson": return org.redisson.api.RedissonClient.class; + case "userecovery": + case "useRecovery": return boolean.class; + default: return null; + } + } + + @Override + public Object getOptionValue(Object obj, String name, boolean ignoreCase) { + org.apache.camel.component.redis.processor.aggregate.RedisAggregationRepository target = (org.apache.camel.component.redis.processor.aggregate.RedisAggregationRepository) obj; + switch (ignoreCase ? name.toLowerCase() : name) { + case "allowserializedheaders": + case "allowSerializedHeaders": return target.isAllowSerializedHeaders(); + case "deadletteruri": + case "deadLetterUri": return target.getDeadLetterUri(); + case "endpoint": return target.getEndpoint(); + case "mapname": + case "mapName": return target.getMapName(); + case "maximumredeliveries": + case "maximumRedeliveries": return target.getMaximumRedeliveries(); + case "optimistic": return target.isOptimistic(); + case "persistencemapname": + case "persistenceMapName": return target.getPersistenceMapName(); + case "recoveryinterval": + case "recoveryInterval": return target.getRecoveryInterval(); + case "redisson": return target.getRedisson(); + case "userecovery": + case "useRecovery": return target.isUseRecovery(); + default: return null; + } + } +} + diff --git a/components/camel-redis/src/generated/resources/META-INF/services/org/apache/camel/bean.properties b/components/camel-redis/src/generated/resources/META-INF/services/org/apache/camel/bean.properties new file mode 100644 index 000000000000..dd7d4f21e0d9 --- /dev/null +++ b/components/camel-redis/src/generated/resources/META-INF/services/org/apache/camel/bean.properties @@ -0,0 +1,7 @@ +# Generated by camel build tools - do NOT edit this file! +bean=RedisAggregationRepository +groupId=org.apache.camel +artifactId=camel-redis +version=4.21.0-SNAPSHOT +projectName=Camel :: Redis +projectDescription=Aggregation repository using Redis as datastore diff --git a/components/camel-redis/src/generated/resources/META-INF/services/org/apache/camel/bean/RedisAggregationRepository.json b/components/camel-redis/src/generated/resources/META-INF/services/org/apache/camel/bean/RedisAggregationRepository.json new file mode 100644 index 000000000000..2dc2c896f3f7 --- /dev/null +++ b/components/camel-redis/src/generated/resources/META-INF/services/org/apache/camel/bean/RedisAggregationRepository.json @@ -0,0 +1,16 @@ +{ + "bean": { + "kind": "bean", + "name": "RedisAggregationRepository", + "javaType": "org.apache.camel.component.redis.processor.aggregate.RedisAggregationRepository", + "interfaceType": "org.apache.camel.spi.AggregationRepository", + "title": "Redis Aggregation Repository", + "description": "Aggregation repository that uses Redis to store exchanges.", + "deprecated": false, + "groupId": "org.apache.camel", + "artifactId": "camel-redis", + "version": "4.21.0-SNAPSHOT", + "properties": { "redisson": { "index": 0, "kind": "property", "displayName": "Redisson", "label": "advanced", "required": false, "type": "object", "javaType": "org.redisson.api.RedissonClient", "deprecated": false, "autowired": false, "secret": false, "description": "To use an existing Redis client to connect to Redis server" }, "endpoint": { "index": 1, "kind": "property", "displayName": "Endpoint", "required": true, "type": "string", "javaType": "java.lang.String", "deprecated": fa [...] + } +} + diff --git a/components/camel-redis/src/generated/resources/META-INF/services/org/apache/camel/configurer/org.apache.camel.component.redis.processor.aggregate.RedisAggregationRepository b/components/camel-redis/src/generated/resources/META-INF/services/org/apache/camel/configurer/org.apache.camel.component.redis.processor.aggregate.RedisAggregationRepository new file mode 100644 index 000000000000..08615e00ff1d --- /dev/null +++ b/components/camel-redis/src/generated/resources/META-INF/services/org/apache/camel/configurer/org.apache.camel.component.redis.processor.aggregate.RedisAggregationRepository @@ -0,0 +1,2 @@ +# Generated by camel build tools - do NOT edit this file! +class=org.apache.camel.component.redis.processor.aggregate.RedisAggregationRepositoryConfigurer diff --git a/components/camel-redis/src/main/java/org/apache/camel/component/redis/processor/aggregate/RedisAggregationRepository.java b/components/camel-redis/src/main/java/org/apache/camel/component/redis/processor/aggregate/RedisAggregationRepository.java index c4d835dd1cda..214fbf4da58f 100644 --- a/components/camel-redis/src/main/java/org/apache/camel/component/redis/processor/aggregate/RedisAggregationRepository.java +++ b/components/camel-redis/src/main/java/org/apache/camel/component/redis/processor/aggregate/RedisAggregationRepository.java @@ -23,6 +23,8 @@ import java.util.concurrent.TimeUnit; import org.apache.camel.CamelContext; import org.apache.camel.Exchange; +import org.apache.camel.RuntimeCamelException; +import org.apache.camel.spi.Configurer; import org.apache.camel.spi.Metadata; import org.apache.camel.spi.OptimisticLockingAggregationRepository; import org.apache.camel.spi.RecoverableAggregationRepository; @@ -43,6 +45,10 @@ import org.slf4j.LoggerFactory; /** * {@link org.apache.camel.spi.AggregationRepository} using Redis as store. */ +@Metadata(label = "bean", + description = "Aggregation repository that uses Redis to store exchanges.", + annotations = { "interfaceName=org.apache.camel.spi.AggregationRepository" }) +@Configurer(metadataOnly = true) public class RedisAggregationRepository extends ServiceSupport implements RecoverableAggregationRepository, OptimisticLockingAggregationRepository { private static final Logger LOG = LoggerFactory.getLogger(RedisAggregationRepository.class); @@ -118,7 +124,7 @@ public class RedisAggregationRepository extends ServiceSupport if (misbehaviorHolder != null) { Exchange misbehaviorEx = unmarshallExchange(camelContext, misbehaviorHolder); LOG.warn( - "Optimistic locking failed for exchange with key {}: IMap#putIfAbsend returned Exchange with ID {}, while it's expected no exchanges to be returned", + "Optimistic locking failed for exchange with key {}: RMap#putIfAbsent returned Exchange with ID {}, while it's expected no exchanges to be returned", key, misbehaviorEx != null ? misbehaviorEx.getExchangeId() : "<null>"); throw new OptimisticLockingException(); } @@ -127,7 +133,7 @@ public class RedisAggregationRepository extends ServiceSupport DefaultExchangeHolder newHolder = DefaultExchangeHolder.marshal(newExchange, true, allowSerializedHeaders); if (!cache.replace(key, oldHolder, newHolder)) { LOG.warn( - "Optimistic locking failed for exchange with key {}: IMap#replace returned no Exchanges, while it's expected to replace one", + "Optimistic locking failed for exchange with key {}: RMap#replace returned no Exchanges, while it's expected to replace one", key); throw new OptimisticLockingException(); } @@ -142,10 +148,10 @@ public class RedisAggregationRepository extends ServiceSupport throw new UnsupportedOperationException(); } LOG.trace("Adding an Exchange with ID {} for key {} in a thread-safe manner.", exchange.getExchangeId(), key); - RLock lock = redisson.getLock("aggregationLock"); + DefaultExchangeHolder newHolder = DefaultExchangeHolder.marshal(exchange, true, allowSerializedHeaders); + RLock lock = redisson.getLock(mapName + "-lock-" + key); try { lock.lock(); - DefaultExchangeHolder newHolder = DefaultExchangeHolder.marshal(exchange, true, allowSerializedHeaders); DefaultExchangeHolder oldHolder = cache.put(key, newHolder); return unmarshallExchange(camelContext, oldHolder); } finally { @@ -293,7 +299,7 @@ public class RedisAggregationRepository extends ServiceSupport LOG.trace("Removing an exchange with ID {} for key {} in an optimistic manner.", exchange.getExchangeId(), key); if (!cache.remove(key, holder)) { LOG.warn( - "Optimistic locking failed for exchange with key {}: IMap#remove removed no Exchanges, while it's expected to remove one.", + "Optimistic locking failed for exchange with key {}: RMap#remove removed no Exchanges, while it's expected to remove one.", key); throw new OptimisticLockingException(); } @@ -332,7 +338,7 @@ public class RedisAggregationRepository extends ServiceSupport "Transaction was rolled back for remove operation with a key %s and an Exchange ID %s.", key, exchange.getExchangeId()); LOG.warn(msg, throwable); - throw new RuntimeException(msg, throwable); + throw new RuntimeCamelException(msg, throwable); } } else { cache.remove(key); @@ -360,6 +366,9 @@ public class RedisAggregationRepository extends ServiceSupport @Override protected void doInit() throws Exception { StringHelper.notEmpty(mapName, "repositoryName"); + if (redisson == null) { + StringHelper.notEmpty(endpoint, "endpoint"); + } if (maximumRedeliveries < 0) { throw new IllegalArgumentException("Maximum redelivery retries must be zero or a positive integer."); } diff --git a/components/camel-redis/src/test/java/org/apache/camel/component/redis/processor/aggregate/integration/RedisAggregationRepositoryOperationsIT.java b/components/camel-redis/src/test/java/org/apache/camel/component/redis/processor/aggregate/integration/RedisAggregationRepositoryOperationsIT.java new file mode 100644 index 000000000000..07231bf520d1 --- /dev/null +++ b/components/camel-redis/src/test/java/org/apache/camel/component/redis/processor/aggregate/integration/RedisAggregationRepositoryOperationsIT.java @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.redis.processor.aggregate.integration; + +import java.util.Set; + +import org.apache.camel.Exchange; +import org.apache.camel.component.redis.processor.aggregate.RedisAggregationRepository; +import org.apache.camel.support.DefaultExchange; +import org.apache.camel.test.infra.redis.services.RedisService; +import org.apache.camel.test.infra.redis.services.RedisServiceFactory; +import org.apache.camel.test.junit6.CamelTestSupport; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.redisson.Redisson; +import org.redisson.api.RedissonClient; +import org.redisson.config.Config; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class RedisAggregationRepositoryOperationsIT extends CamelTestSupport { + + @RegisterExtension + static RedisService service = RedisServiceFactory.createService(); + + private RedisAggregationRepository createRepo(String mapName, boolean optimistic) { + RedisAggregationRepository repo = new RedisAggregationRepository(mapName, service.getServiceAddress(), optimistic); + repo.start(); + return repo; + } + + private Exchange createExchange(String body) { + Exchange exchange = new DefaultExchange(context); + exchange.getIn().setBody(body); + return exchange; + } + + @Test + public void testPessimisticAddAndGet() { + RedisAggregationRepository repo = createRepo("pessimisticAddGet", false); + try { + Exchange exchange = createExchange("testBody"); + Exchange old = repo.add(context, "key1", exchange); + assertNull(old); + + Exchange result = repo.get(context, "key1"); + assertNotNull(result); + assertEquals("testBody", result.getIn().getBody(String.class)); + } finally { + repo.stop(); + } + } + + @Test + public void testPessimisticReplace() { + RedisAggregationRepository repo = createRepo("pessimisticReplace", false); + try { + Exchange first = createExchange("first"); + repo.add(context, "key1", first); + + Exchange second = createExchange("second"); + Exchange old = repo.add(context, "key1", second); + assertNotNull(old); + assertEquals("first", old.getIn().getBody(String.class)); + + Exchange result = repo.get(context, "key1"); + assertEquals("second", result.getIn().getBody(String.class)); + } finally { + repo.stop(); + } + } + + @Test + public void testOptimisticAddAndGet() { + RedisAggregationRepository repo = createRepo("optimisticAddGet", true); + try { + Exchange exchange = createExchange("testBody"); + Exchange old = repo.add(context, "key1", null, exchange); + assertNull(old); + + Exchange result = repo.get(context, "key1"); + assertNotNull(result); + assertEquals("testBody", result.getIn().getBody(String.class)); + } finally { + repo.stop(); + } + } + + @Test + public void testOptimisticUpdate() { + RedisAggregationRepository repo = createRepo("optimisticUpdate", true); + try { + Exchange first = createExchange("first"); + repo.add(context, "key1", null, first); + + Exchange retrieved = repo.get(context, "key1"); + Exchange updated = createExchange("updated"); + Exchange old = repo.add(context, "key1", retrieved, updated); + assertNotNull(old); + + Exchange result = repo.get(context, "key1"); + assertEquals("updated", result.getIn().getBody(String.class)); + } finally { + repo.stop(); + } + } + + @Test + public void testOptimisticRemoveWithRecovery() { + RedisAggregationRepository repo = createRepo("optimisticRemoveRecovery", true); + try { + Exchange exchange = createExchange("recoverable"); + repo.add(context, "key1", null, exchange); + + Exchange toRemove = repo.get(context, "key1"); + repo.remove(context, "key1", toRemove); + + assertNull(repo.get(context, "key1")); + + Set<String> scanned = repo.scan(context); + assertEquals(1, scanned.size()); + + String exchangeId = scanned.iterator().next(); + Exchange recovered = repo.recover(context, exchangeId); + assertNotNull(recovered); + assertEquals("recoverable", recovered.getIn().getBody(String.class)); + + repo.confirm(context, exchangeId); + assertTrue(repo.scan(context).isEmpty()); + } finally { + repo.stop(); + } + } + + @Test + public void testPessimisticRemoveWithRecovery() { + RedisAggregationRepository repo = createRepo("pessimisticRemoveRecovery", false); + try { + Exchange exchange = createExchange("recoverable"); + repo.add(context, "key1", exchange); + + Exchange toRemove = repo.get(context, "key1"); + repo.remove(context, "key1", toRemove); + + assertNull(repo.get(context, "key1")); + + Set<String> scanned = repo.scan(context); + assertEquals(1, scanned.size()); + + String exchangeId = scanned.iterator().next(); + Exchange recovered = repo.recover(context, exchangeId); + assertNotNull(recovered); + + repo.confirm(context, exchangeId); + assertTrue(repo.scan(context).isEmpty()); + } finally { + repo.stop(); + } + } + + @Test + public void testRemoveWithoutRecovery() { + RedisAggregationRepository repo + = new RedisAggregationRepository("removeNoRecovery", service.getServiceAddress(), true); + repo.setUseRecovery(false); + repo.start(); + try { + Exchange exchange = createExchange("noRecover"); + repo.add(context, "key1", null, exchange); + + Exchange toRemove = repo.get(context, "key1"); + repo.remove(context, "key1", toRemove); + + assertNull(repo.get(context, "key1")); + assertTrue(repo.scan(context).isEmpty()); + } finally { + repo.stop(); + } + } + + @Test + public void testGetKeysAndContainsKey() { + RedisAggregationRepository repo = createRepo("keysAndContains", false); + try { + repo.add(context, "a", createExchange("1")); + repo.add(context, "b", createExchange("2")); + + Set<String> keys = repo.getKeys(); + assertEquals(2, keys.size()); + assertTrue(keys.contains("a")); + assertTrue(keys.contains("b")); + + assertTrue(repo.containsKey("a")); + assertFalse(repo.containsKey("nonexistent")); + } finally { + repo.stop(); + } + } + + @Test + public void testOptimisticRepoRejectsPessimisticAdd() { + RedisAggregationRepository repo = createRepo("rejectPessimistic", true); + try { + Exchange exchange = createExchange("test"); + assertThrows(UnsupportedOperationException.class, () -> repo.add(context, "key1", exchange)); + } finally { + repo.stop(); + } + } + + @Test + public void testPessimisticRepoRejectsOptimisticAdd() { + RedisAggregationRepository repo = createRepo("rejectOptimistic", false); + try { + Exchange exchange = createExchange("test"); + assertThrows(UnsupportedOperationException.class, () -> repo.add(context, "key1", null, exchange)); + } finally { + repo.stop(); + } + } + + @Test + public void testCustomRedissonClient() { + Config config = new Config(); + config.useSingleServer().setAddress(String.format("redis://%s", service.getServiceAddress())); + RedissonClient customClient = Redisson.create(config); + + try { + RedisAggregationRepository repo = new RedisAggregationRepository(); + repo.setMapName("customClient"); + repo.setRedisson(customClient); + repo.start(); + + Exchange exchange = createExchange("custom"); + repo.add(context, "key1", exchange); + + Exchange result = repo.get(context, "key1"); + assertNotNull(result); + assertEquals("custom", result.getIn().getBody(String.class)); + + repo.stop(); + + assertFalse(customClient.isShutdown()); + } finally { + customClient.shutdown(); + } + } + + @Test + public void testEndpointValidation() { + RedisAggregationRepository repo = new RedisAggregationRepository(); + repo.setMapName("validation"); + assertThrows(IllegalArgumentException.class, () -> repo.init()); + } +}
