This is an automated email from the ASF dual-hosted git repository.

Croway pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit e6bf966eed8585ce133c4f1e0973913b7f35e76c
Author: Croway <[email protected]>
AuthorDate: Mon Jun 8 17:57:54 2026 +0200

    CAMEL-23714: Polish RedisAggregationRepository and add tests
---
 .../org/apache/camel/catalog/beans.properties      |   1 +
 .../catalog/beans/RedisAggregationRepository.json  |  16 ++
 .../RedisAggregationRepositoryConfigurer.java      |  96 ++++++++
 .../services/org/apache/camel/bean.properties      |   7 +
 .../camel/bean/RedisAggregationRepository.json     |  16 ++
 ....processor.aggregate.RedisAggregationRepository |   2 +
 .../aggregate/RedisAggregationRepository.java      |  21 +-
 .../RedisAggregationRepositoryOperationsIT.java    | 274 +++++++++++++++++++++
 8 files changed, 427 insertions(+), 6 deletions(-)

diff --git 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/beans.properties
 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/beans.properties
index 83ad5f38ee54..374208cf7ff6 100644
--- 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/beans.properties
+++ 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/beans.properties
@@ -37,6 +37,7 @@ MemoryAggregationRepository
 MemoryIdempotentRepository
 MongoDbIdempotentRepository
 OpensearchBulkRequestAggregationStrategy
+RedisAggregationRepository
 SimpleScheduledRoutePolicy
 SpringCacheIdempotentRepository
 SpringRedisIdempotentRepository
diff --git 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/beans/RedisAggregationRepository.json
 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/beans/RedisAggregationRepository.json
new file mode 100644
index 000000000000..2dc2c896f3f7
--- /dev/null
+++ 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/beans/RedisAggregationRepository.json
@@ -0,0 +1,16 @@
+{
+  "bean": {
+    "kind": "bean",
+    "name": "RedisAggregationRepository",
+    "javaType": 
"org.apache.camel.component.redis.processor.aggregate.RedisAggregationRepository",
+    "interfaceType": "org.apache.camel.spi.AggregationRepository",
+    "title": "Redis Aggregation Repository",
+    "description": "Aggregation repository that uses Redis to store 
exchanges.",
+    "deprecated": false,
+    "groupId": "org.apache.camel",
+    "artifactId": "camel-redis",
+    "version": "4.21.0-SNAPSHOT",
+    "properties": { "redisson": { "index": 0, "kind": "property", 
"displayName": "Redisson", "label": "advanced", "required": false, "type": 
"object", "javaType": "org.redisson.api.RedissonClient", "deprecated": false, 
"autowired": false, "secret": false, "description": "To use an existing Redis 
client to connect to Redis server" }, "endpoint": { "index": 1, "kind": 
"property", "displayName": "Endpoint", "required": true, "type": "string", 
"javaType": "java.lang.String", "deprecated": fa [...]
+  }
+}
+
diff --git 
a/components/camel-redis/src/generated/java/org/apache/camel/component/redis/processor/aggregate/RedisAggregationRepositoryConfigurer.java
 
b/components/camel-redis/src/generated/java/org/apache/camel/component/redis/processor/aggregate/RedisAggregationRepositoryConfigurer.java
new file mode 100644
index 000000000000..b696d8441c05
--- /dev/null
+++ 
b/components/camel-redis/src/generated/java/org/apache/camel/component/redis/processor/aggregate/RedisAggregationRepositoryConfigurer.java
@@ -0,0 +1,96 @@
+/* Generated by camel build tools - do NOT edit this file! */
+package org.apache.camel.component.redis.processor.aggregate;
+
+import javax.annotation.processing.Generated;
+import java.util.Map;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.spi.ExtendedPropertyConfigurerGetter;
+import org.apache.camel.spi.PropertyConfigurerGetter;
+import org.apache.camel.spi.ConfigurerStrategy;
+import org.apache.camel.spi.GeneratedPropertyConfigurer;
+import org.apache.camel.util.CaseInsensitiveMap;
+import 
org.apache.camel.component.redis.processor.aggregate.RedisAggregationRepository;
+
+/**
+ * Generated by camel build tools - do NOT edit this file!
+ */
+@Generated("org.apache.camel.maven.packaging.GenerateConfigurerMojo")
+@SuppressWarnings("unchecked")
+public class RedisAggregationRepositoryConfigurer extends 
org.apache.camel.support.component.PropertyConfigurerSupport implements 
GeneratedPropertyConfigurer, PropertyConfigurerGetter {
+
+    @Override
+    public boolean configure(CamelContext camelContext, Object obj, String 
name, Object value, boolean ignoreCase) {
+        
org.apache.camel.component.redis.processor.aggregate.RedisAggregationRepository 
target = 
(org.apache.camel.component.redis.processor.aggregate.RedisAggregationRepository)
 obj;
+        switch (ignoreCase ? name.toLowerCase() : name) {
+        case "allowserializedheaders":
+        case "allowSerializedHeaders": 
target.setAllowSerializedHeaders(property(camelContext, boolean.class, value)); 
return true;
+        case "deadletteruri":
+        case "deadLetterUri": target.setDeadLetterUri(property(camelContext, 
java.lang.String.class, value)); return true;
+        case "endpoint": target.setEndpoint(property(camelContext, 
java.lang.String.class, value)); return true;
+        case "mapname":
+        case "mapName": target.setMapName(property(camelContext, 
java.lang.String.class, value)); return true;
+        case "maximumredeliveries":
+        case "maximumRedeliveries": 
target.setMaximumRedeliveries(property(camelContext, int.class, value)); return 
true;
+        case "optimistic": target.setOptimistic(property(camelContext, 
boolean.class, value)); return true;
+        case "persistencemapname":
+        case "persistenceMapName": 
target.setPersistenceMapName(property(camelContext, java.lang.String.class, 
value)); return true;
+        case "recoveryinterval":
+        case "recoveryInterval": 
target.setRecoveryInterval(property(camelContext, long.class, value)); return 
true;
+        case "redisson": target.setRedisson(property(camelContext, 
org.redisson.api.RedissonClient.class, value)); return true;
+        case "userecovery":
+        case "useRecovery": target.setUseRecovery(property(camelContext, 
boolean.class, value)); return true;
+        default: return false;
+        }
+    }
+
+    @Override
+    public Class<?> getOptionType(String name, boolean ignoreCase) {
+        switch (ignoreCase ? name.toLowerCase() : name) {
+        case "allowserializedheaders":
+        case "allowSerializedHeaders": return boolean.class;
+        case "deadletteruri":
+        case "deadLetterUri": return java.lang.String.class;
+        case "endpoint": return java.lang.String.class;
+        case "mapname":
+        case "mapName": return java.lang.String.class;
+        case "maximumredeliveries":
+        case "maximumRedeliveries": return int.class;
+        case "optimistic": return boolean.class;
+        case "persistencemapname":
+        case "persistenceMapName": return java.lang.String.class;
+        case "recoveryinterval":
+        case "recoveryInterval": return long.class;
+        case "redisson": return org.redisson.api.RedissonClient.class;
+        case "userecovery":
+        case "useRecovery": return boolean.class;
+        default: return null;
+        }
+    }
+
+    @Override
+    public Object getOptionValue(Object obj, String name, boolean ignoreCase) {
+        
org.apache.camel.component.redis.processor.aggregate.RedisAggregationRepository 
target = 
(org.apache.camel.component.redis.processor.aggregate.RedisAggregationRepository)
 obj;
+        switch (ignoreCase ? name.toLowerCase() : name) {
+        case "allowserializedheaders":
+        case "allowSerializedHeaders": return 
target.isAllowSerializedHeaders();
+        case "deadletteruri":
+        case "deadLetterUri": return target.getDeadLetterUri();
+        case "endpoint": return target.getEndpoint();
+        case "mapname":
+        case "mapName": return target.getMapName();
+        case "maximumredeliveries":
+        case "maximumRedeliveries": return target.getMaximumRedeliveries();
+        case "optimistic": return target.isOptimistic();
+        case "persistencemapname":
+        case "persistenceMapName": return target.getPersistenceMapName();
+        case "recoveryinterval":
+        case "recoveryInterval": return target.getRecoveryInterval();
+        case "redisson": return target.getRedisson();
+        case "userecovery":
+        case "useRecovery": return target.isUseRecovery();
+        default: return null;
+        }
+    }
+}
+
diff --git 
a/components/camel-redis/src/generated/resources/META-INF/services/org/apache/camel/bean.properties
 
b/components/camel-redis/src/generated/resources/META-INF/services/org/apache/camel/bean.properties
new file mode 100644
index 000000000000..dd7d4f21e0d9
--- /dev/null
+++ 
b/components/camel-redis/src/generated/resources/META-INF/services/org/apache/camel/bean.properties
@@ -0,0 +1,7 @@
+# Generated by camel build tools - do NOT edit this file!
+bean=RedisAggregationRepository
+groupId=org.apache.camel
+artifactId=camel-redis
+version=4.21.0-SNAPSHOT
+projectName=Camel :: Redis
+projectDescription=Aggregation repository using Redis as datastore
diff --git 
a/components/camel-redis/src/generated/resources/META-INF/services/org/apache/camel/bean/RedisAggregationRepository.json
 
b/components/camel-redis/src/generated/resources/META-INF/services/org/apache/camel/bean/RedisAggregationRepository.json
new file mode 100644
index 000000000000..2dc2c896f3f7
--- /dev/null
+++ 
b/components/camel-redis/src/generated/resources/META-INF/services/org/apache/camel/bean/RedisAggregationRepository.json
@@ -0,0 +1,16 @@
+{
+  "bean": {
+    "kind": "bean",
+    "name": "RedisAggregationRepository",
+    "javaType": 
"org.apache.camel.component.redis.processor.aggregate.RedisAggregationRepository",
+    "interfaceType": "org.apache.camel.spi.AggregationRepository",
+    "title": "Redis Aggregation Repository",
+    "description": "Aggregation repository that uses Redis to store 
exchanges.",
+    "deprecated": false,
+    "groupId": "org.apache.camel",
+    "artifactId": "camel-redis",
+    "version": "4.21.0-SNAPSHOT",
+    "properties": { "redisson": { "index": 0, "kind": "property", 
"displayName": "Redisson", "label": "advanced", "required": false, "type": 
"object", "javaType": "org.redisson.api.RedissonClient", "deprecated": false, 
"autowired": false, "secret": false, "description": "To use an existing Redis 
client to connect to Redis server" }, "endpoint": { "index": 1, "kind": 
"property", "displayName": "Endpoint", "required": true, "type": "string", 
"javaType": "java.lang.String", "deprecated": fa [...]
+  }
+}
+
diff --git 
a/components/camel-redis/src/generated/resources/META-INF/services/org/apache/camel/configurer/org.apache.camel.component.redis.processor.aggregate.RedisAggregationRepository
 
b/components/camel-redis/src/generated/resources/META-INF/services/org/apache/camel/configurer/org.apache.camel.component.redis.processor.aggregate.RedisAggregationRepository
new file mode 100644
index 000000000000..08615e00ff1d
--- /dev/null
+++ 
b/components/camel-redis/src/generated/resources/META-INF/services/org/apache/camel/configurer/org.apache.camel.component.redis.processor.aggregate.RedisAggregationRepository
@@ -0,0 +1,2 @@
+# Generated by camel build tools - do NOT edit this file!
+class=org.apache.camel.component.redis.processor.aggregate.RedisAggregationRepositoryConfigurer
diff --git 
a/components/camel-redis/src/main/java/org/apache/camel/component/redis/processor/aggregate/RedisAggregationRepository.java
 
b/components/camel-redis/src/main/java/org/apache/camel/component/redis/processor/aggregate/RedisAggregationRepository.java
index c4d835dd1cda..214fbf4da58f 100644
--- 
a/components/camel-redis/src/main/java/org/apache/camel/component/redis/processor/aggregate/RedisAggregationRepository.java
+++ 
b/components/camel-redis/src/main/java/org/apache/camel/component/redis/processor/aggregate/RedisAggregationRepository.java
@@ -23,6 +23,8 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.spi.Configurer;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.OptimisticLockingAggregationRepository;
 import org.apache.camel.spi.RecoverableAggregationRepository;
@@ -43,6 +45,10 @@ import org.slf4j.LoggerFactory;
 /**
  * {@link org.apache.camel.spi.AggregationRepository} using Redis as store.
  */
+@Metadata(label = "bean",
+          description = "Aggregation repository that uses Redis to store 
exchanges.",
+          annotations = { 
"interfaceName=org.apache.camel.spi.AggregationRepository" })
+@Configurer(metadataOnly = true)
 public class RedisAggregationRepository extends ServiceSupport
         implements RecoverableAggregationRepository, 
OptimisticLockingAggregationRepository {
     private static final Logger LOG = 
LoggerFactory.getLogger(RedisAggregationRepository.class);
@@ -118,7 +124,7 @@ public class RedisAggregationRepository extends 
ServiceSupport
             if (misbehaviorHolder != null) {
                 Exchange misbehaviorEx = unmarshallExchange(camelContext, 
misbehaviorHolder);
                 LOG.warn(
-                        "Optimistic locking failed for exchange with key {}: 
IMap#putIfAbsend returned Exchange with ID {}, while it's expected no exchanges 
to be returned",
+                        "Optimistic locking failed for exchange with key {}: 
RMap#putIfAbsent returned Exchange with ID {}, while it's expected no exchanges 
to be returned",
                         key, misbehaviorEx != null ? 
misbehaviorEx.getExchangeId() : "<null>");
                 throw new OptimisticLockingException();
             }
@@ -127,7 +133,7 @@ public class RedisAggregationRepository extends 
ServiceSupport
             DefaultExchangeHolder newHolder = 
DefaultExchangeHolder.marshal(newExchange, true, allowSerializedHeaders);
             if (!cache.replace(key, oldHolder, newHolder)) {
                 LOG.warn(
-                        "Optimistic locking failed for exchange with key {}: 
IMap#replace returned no Exchanges, while it's expected to replace one",
+                        "Optimistic locking failed for exchange with key {}: 
RMap#replace returned no Exchanges, while it's expected to replace one",
                         key);
                 throw new OptimisticLockingException();
             }
@@ -142,10 +148,10 @@ public class RedisAggregationRepository extends 
ServiceSupport
             throw new UnsupportedOperationException();
         }
         LOG.trace("Adding an Exchange with ID {} for key {} in a thread-safe 
manner.", exchange.getExchangeId(), key);
-        RLock lock = redisson.getLock("aggregationLock");
+        DefaultExchangeHolder newHolder = 
DefaultExchangeHolder.marshal(exchange, true, allowSerializedHeaders);
+        RLock lock = redisson.getLock(mapName + "-lock-" + key);
         try {
             lock.lock();
-            DefaultExchangeHolder newHolder = 
DefaultExchangeHolder.marshal(exchange, true, allowSerializedHeaders);
             DefaultExchangeHolder oldHolder = cache.put(key, newHolder);
             return unmarshallExchange(camelContext, oldHolder);
         } finally {
@@ -293,7 +299,7 @@ public class RedisAggregationRepository extends 
ServiceSupport
             LOG.trace("Removing an exchange with ID {} for key {} in an 
optimistic manner.", exchange.getExchangeId(), key);
             if (!cache.remove(key, holder)) {
                 LOG.warn(
-                        "Optimistic locking failed for exchange with key {}: 
IMap#remove removed no Exchanges, while it's expected to remove one.",
+                        "Optimistic locking failed for exchange with key {}: 
RMap#remove removed no Exchanges, while it's expected to remove one.",
                         key);
                 throw new OptimisticLockingException();
             }
@@ -332,7 +338,7 @@ public class RedisAggregationRepository extends 
ServiceSupport
                             "Transaction was rolled back for remove operation 
with a key %s and an Exchange ID %s.",
                             key, exchange.getExchangeId());
                     LOG.warn(msg, throwable);
-                    throw new RuntimeException(msg, throwable);
+                    throw new RuntimeCamelException(msg, throwable);
                 }
             } else {
                 cache.remove(key);
@@ -360,6 +366,9 @@ public class RedisAggregationRepository extends 
ServiceSupport
     @Override
     protected void doInit() throws Exception {
         StringHelper.notEmpty(mapName, "repositoryName");
+        if (redisson == null) {
+            StringHelper.notEmpty(endpoint, "endpoint");
+        }
         if (maximumRedeliveries < 0) {
             throw new IllegalArgumentException("Maximum redelivery retries 
must be zero or a positive integer.");
         }
diff --git 
a/components/camel-redis/src/test/java/org/apache/camel/component/redis/processor/aggregate/integration/RedisAggregationRepositoryOperationsIT.java
 
b/components/camel-redis/src/test/java/org/apache/camel/component/redis/processor/aggregate/integration/RedisAggregationRepositoryOperationsIT.java
new file mode 100644
index 000000000000..07231bf520d1
--- /dev/null
+++ 
b/components/camel-redis/src/test/java/org/apache/camel/component/redis/processor/aggregate/integration/RedisAggregationRepositoryOperationsIT.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.redis.processor.aggregate.integration;
+
+import java.util.Set;
+
+import org.apache.camel.Exchange;
+import 
org.apache.camel.component.redis.processor.aggregate.RedisAggregationRepository;
+import org.apache.camel.support.DefaultExchange;
+import org.apache.camel.test.infra.redis.services.RedisService;
+import org.apache.camel.test.infra.redis.services.RedisServiceFactory;
+import org.apache.camel.test.junit6.CamelTestSupport;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.redisson.Redisson;
+import org.redisson.api.RedissonClient;
+import org.redisson.config.Config;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class RedisAggregationRepositoryOperationsIT extends CamelTestSupport {
+
+    @RegisterExtension
+    static RedisService service = RedisServiceFactory.createService();
+
+    private RedisAggregationRepository createRepo(String mapName, boolean 
optimistic) {
+        RedisAggregationRepository repo = new 
RedisAggregationRepository(mapName, service.getServiceAddress(), optimistic);
+        repo.start();
+        return repo;
+    }
+
+    private Exchange createExchange(String body) {
+        Exchange exchange = new DefaultExchange(context);
+        exchange.getIn().setBody(body);
+        return exchange;
+    }
+
+    @Test
+    public void testPessimisticAddAndGet() {
+        RedisAggregationRepository repo = createRepo("pessimisticAddGet", 
false);
+        try {
+            Exchange exchange = createExchange("testBody");
+            Exchange old = repo.add(context, "key1", exchange);
+            assertNull(old);
+
+            Exchange result = repo.get(context, "key1");
+            assertNotNull(result);
+            assertEquals("testBody", result.getIn().getBody(String.class));
+        } finally {
+            repo.stop();
+        }
+    }
+
+    @Test
+    public void testPessimisticReplace() {
+        RedisAggregationRepository repo = createRepo("pessimisticReplace", 
false);
+        try {
+            Exchange first = createExchange("first");
+            repo.add(context, "key1", first);
+
+            Exchange second = createExchange("second");
+            Exchange old = repo.add(context, "key1", second);
+            assertNotNull(old);
+            assertEquals("first", old.getIn().getBody(String.class));
+
+            Exchange result = repo.get(context, "key1");
+            assertEquals("second", result.getIn().getBody(String.class));
+        } finally {
+            repo.stop();
+        }
+    }
+
+    @Test
+    public void testOptimisticAddAndGet() {
+        RedisAggregationRepository repo = createRepo("optimisticAddGet", true);
+        try {
+            Exchange exchange = createExchange("testBody");
+            Exchange old = repo.add(context, "key1", null, exchange);
+            assertNull(old);
+
+            Exchange result = repo.get(context, "key1");
+            assertNotNull(result);
+            assertEquals("testBody", result.getIn().getBody(String.class));
+        } finally {
+            repo.stop();
+        }
+    }
+
+    @Test
+    public void testOptimisticUpdate() {
+        RedisAggregationRepository repo = createRepo("optimisticUpdate", true);
+        try {
+            Exchange first = createExchange("first");
+            repo.add(context, "key1", null, first);
+
+            Exchange retrieved = repo.get(context, "key1");
+            Exchange updated = createExchange("updated");
+            Exchange old = repo.add(context, "key1", retrieved, updated);
+            assertNotNull(old);
+
+            Exchange result = repo.get(context, "key1");
+            assertEquals("updated", result.getIn().getBody(String.class));
+        } finally {
+            repo.stop();
+        }
+    }
+
+    @Test
+    public void testOptimisticRemoveWithRecovery() {
+        RedisAggregationRepository repo = 
createRepo("optimisticRemoveRecovery", true);
+        try {
+            Exchange exchange = createExchange("recoverable");
+            repo.add(context, "key1", null, exchange);
+
+            Exchange toRemove = repo.get(context, "key1");
+            repo.remove(context, "key1", toRemove);
+
+            assertNull(repo.get(context, "key1"));
+
+            Set<String> scanned = repo.scan(context);
+            assertEquals(1, scanned.size());
+
+            String exchangeId = scanned.iterator().next();
+            Exchange recovered = repo.recover(context, exchangeId);
+            assertNotNull(recovered);
+            assertEquals("recoverable", 
recovered.getIn().getBody(String.class));
+
+            repo.confirm(context, exchangeId);
+            assertTrue(repo.scan(context).isEmpty());
+        } finally {
+            repo.stop();
+        }
+    }
+
+    @Test
+    public void testPessimisticRemoveWithRecovery() {
+        RedisAggregationRepository repo = 
createRepo("pessimisticRemoveRecovery", false);
+        try {
+            Exchange exchange = createExchange("recoverable");
+            repo.add(context, "key1", exchange);
+
+            Exchange toRemove = repo.get(context, "key1");
+            repo.remove(context, "key1", toRemove);
+
+            assertNull(repo.get(context, "key1"));
+
+            Set<String> scanned = repo.scan(context);
+            assertEquals(1, scanned.size());
+
+            String exchangeId = scanned.iterator().next();
+            Exchange recovered = repo.recover(context, exchangeId);
+            assertNotNull(recovered);
+
+            repo.confirm(context, exchangeId);
+            assertTrue(repo.scan(context).isEmpty());
+        } finally {
+            repo.stop();
+        }
+    }
+
+    @Test
+    public void testRemoveWithoutRecovery() {
+        RedisAggregationRepository repo
+                = new RedisAggregationRepository("removeNoRecovery", 
service.getServiceAddress(), true);
+        repo.setUseRecovery(false);
+        repo.start();
+        try {
+            Exchange exchange = createExchange("noRecover");
+            repo.add(context, "key1", null, exchange);
+
+            Exchange toRemove = repo.get(context, "key1");
+            repo.remove(context, "key1", toRemove);
+
+            assertNull(repo.get(context, "key1"));
+            assertTrue(repo.scan(context).isEmpty());
+        } finally {
+            repo.stop();
+        }
+    }
+
+    @Test
+    public void testGetKeysAndContainsKey() {
+        RedisAggregationRepository repo = createRepo("keysAndContains", false);
+        try {
+            repo.add(context, "a", createExchange("1"));
+            repo.add(context, "b", createExchange("2"));
+
+            Set<String> keys = repo.getKeys();
+            assertEquals(2, keys.size());
+            assertTrue(keys.contains("a"));
+            assertTrue(keys.contains("b"));
+
+            assertTrue(repo.containsKey("a"));
+            assertFalse(repo.containsKey("nonexistent"));
+        } finally {
+            repo.stop();
+        }
+    }
+
+    @Test
+    public void testOptimisticRepoRejectsPessimisticAdd() {
+        RedisAggregationRepository repo = createRepo("rejectPessimistic", 
true);
+        try {
+            Exchange exchange = createExchange("test");
+            assertThrows(UnsupportedOperationException.class, () -> 
repo.add(context, "key1", exchange));
+        } finally {
+            repo.stop();
+        }
+    }
+
+    @Test
+    public void testPessimisticRepoRejectsOptimisticAdd() {
+        RedisAggregationRepository repo = createRepo("rejectOptimistic", 
false);
+        try {
+            Exchange exchange = createExchange("test");
+            assertThrows(UnsupportedOperationException.class, () -> 
repo.add(context, "key1", null, exchange));
+        } finally {
+            repo.stop();
+        }
+    }
+
+    @Test
+    public void testCustomRedissonClient() {
+        Config config = new Config();
+        config.useSingleServer().setAddress(String.format("redis://%s", 
service.getServiceAddress()));
+        RedissonClient customClient = Redisson.create(config);
+
+        try {
+            RedisAggregationRepository repo = new RedisAggregationRepository();
+            repo.setMapName("customClient");
+            repo.setRedisson(customClient);
+            repo.start();
+
+            Exchange exchange = createExchange("custom");
+            repo.add(context, "key1", exchange);
+
+            Exchange result = repo.get(context, "key1");
+            assertNotNull(result);
+            assertEquals("custom", result.getIn().getBody(String.class));
+
+            repo.stop();
+
+            assertFalse(customClient.isShutdown());
+        } finally {
+            customClient.shutdown();
+        }
+    }
+
+    @Test
+    public void testEndpointValidation() {
+        RedisAggregationRepository repo = new RedisAggregationRepository();
+        repo.setMapName("validation");
+        assertThrows(IllegalArgumentException.class, () -> repo.init());
+    }
+}

Reply via email to