This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-4.14.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-4.14.x by this push:
     new 544332f187f3 CAMEL-23294: fix orphan lock race in JDBC idempotent 
repository (#22517)
544332f187f3 is described below

commit 544332f187f3f2c688584f54e010e59ba91526d4
Author: Federico Mariani <[email protected]>
AuthorDate: Fri Apr 10 13:57:25 2026 +0200

    CAMEL-23294: fix orphan lock race in JDBC idempotent repository (#22517)
    
    When multiple instances concurrently recover the same orphan lock,
    both succeed because add() ignores insert()'s return value and
    insert() updates the timestamp unconditionally.
    
    Fix:
    - add() now checks insert()'s return value (0 = not acquired)
    - insert() uses conditional UPDATE (WHERE createdAt < ?) for orphan
      recovery so only one instance can claim an orphan lock
---
 .../jdbc/AbstractJdbcMessageIdRepository.java      |   9 +-
 .../JdbcOrphanLockAwareIdempotentRepository.java   |  15 ++-
 ...ckAwareIdempotentRepositoryConcurrencyTest.java | 147 +++++++++++++++++++++
 3 files changed, 164 insertions(+), 7 deletions(-)

diff --git 
a/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/AbstractJdbcMessageIdRepository.java
 
b/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/AbstractJdbcMessageIdRepository.java
index 17eadaadaafc..62cbca8a92f3 100644
--- 
a/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/AbstractJdbcMessageIdRepository.java
+++ 
b/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/AbstractJdbcMessageIdRepository.java
@@ -140,11 +140,12 @@ public abstract class AbstractJdbcMessageIdRepository 
extends ServiceSupport imp
             public Boolean doInTransaction(TransactionStatus status) {
                 int count = queryForInt(key);
                 if (count == 0) {
-                    insert(key);
-                    return Boolean.TRUE;
-                } else {
-                    return Boolean.FALSE;
+                    int insertedCount = insert(key);
+                    if (insertedCount != 0) {
+                        return Boolean.TRUE;
+                    }
                 }
+                return Boolean.FALSE;
             }
         });
         return rc.booleanValue();
diff --git 
a/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcOrphanLockAwareIdempotentRepository.java
 
b/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcOrphanLockAwareIdempotentRepository.java
index 95992703ff4b..0cb53e1e4388 100644
--- 
a/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcOrphanLockAwareIdempotentRepository.java
+++ 
b/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcOrphanLockAwareIdempotentRepository.java
@@ -123,9 +123,18 @@ public class JdbcOrphanLockAwareIdempotentRepository 
extends JdbcMessageIdReposi
                 processorNameMessageIdSet.add(new 
ProcessorNameAndMessageId(processorName, key));
                 return result;
             } else {
-                //Update in case of orphan lock where a process dies without 
releasing exist lock
-                return jdbcTemplate.update(getUpdateTimestampQuery(), 
currentTimestamp,
-                        processorName, key);
+                // Row exists — try to claim orphan lock with conditional 
UPDATE.
+                // Only succeeds if createdAt is older than lockMaxAge (lock 
is truly orphaned).
+                // Returns 0 if the lock is held by an active instance, 
preventing the race in CAMEL-23294.
+                String orphanLockRecoverQueryString = 
getUpdateTimestampQuery() + " AND createdAt < ?";
+                Timestamp xMillisAgo = new 
Timestamp(System.currentTimeMillis() - lockMaxAgeMillis);
+                int result
+                        = jdbcTemplate.update(orphanLockRecoverQueryString, 
currentTimestamp, processorName, key, xMillisAgo);
+                if (result > 0) {
+                    log.debug("Orphan lock seized for key: {}", key);
+                    processorNameMessageIdSet.add(new 
ProcessorNameAndMessageId(processorName, key));
+                }
+                return result;
             }
         } finally {
             sl.unlockWrite(stamp);
diff --git 
a/components/camel-sql/src/test/java/org/apache/camel/processor/idempotent/jdbc/JdbcOrphanLockAwareIdempotentRepositoryConcurrencyTest.java
 
b/components/camel-sql/src/test/java/org/apache/camel/processor/idempotent/jdbc/JdbcOrphanLockAwareIdempotentRepositoryConcurrencyTest.java
new file mode 100644
index 000000000000..d33ee3d6afcb
--- /dev/null
+++ 
b/components/camel-sql/src/test/java/org/apache/camel/processor/idempotent/jdbc/JdbcOrphanLockAwareIdempotentRepositoryConcurrencyTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.idempotent.jdbc;
+
+import java.sql.Timestamp;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.camel.impl.DefaultCamelContext;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Reproduces CAMEL-23294: JDBC-based Idempotent repository race condition in 
orphan lock recovery.
+ * <p>
+ * When multiple instances (separate JVMs) concurrently detect an orphan lock 
and try to claim it, both succeed because:
+ * <ol>
+ * <li>{@code add()} ignores the return value of {@code insert()} — it always 
returns {@code true} when
+ * {@code queryForInt()} returned 0</li>
+ * <li>{@code insert()} does an unconditional UPDATE on orphan recovery with 
no guard to prevent two instances from both
+ * succeeding</li>
+ * </ol>
+ */
+public class JdbcOrphanLockAwareIdempotentRepositoryConcurrencyTest {
+
+    private static final String PROCESSOR_NAME = "TEST_PROCESSOR";
+    private static final long LOCK_MAX_AGE_MILLIS = 300_000L;
+    private static final long LOCK_KEEP_ALIVE_MILLIS = 3_000L;
+
+    private EmbeddedDatabase dataSource;
+    private JdbcTemplate jdbcTemplate;
+
+    @BeforeEach
+    void setup() {
+        dataSource = new EmbeddedDatabaseBuilder()
+                .setType(EmbeddedDatabaseType.HSQL)
+                .addScript("classpath:sql/idempotentWithOrphanLockRemoval.sql")
+                .generateUniqueName(true)
+                .build();
+        jdbcTemplate = new JdbcTemplate(dataSource);
+    }
+
+    @AfterEach
+    void tearDown() {
+        if (dataSource != null) {
+            dataSource.shutdown();
+        }
+    }
+
+    private JdbcOrphanLockAwareIdempotentRepository createRepository() throws 
Exception {
+        JdbcOrphanLockAwareIdempotentRepository repo
+                = new JdbcOrphanLockAwareIdempotentRepository(dataSource, 
PROCESSOR_NAME, new DefaultCamelContext());
+        repo.setLockMaxAgeMillis(LOCK_MAX_AGE_MILLIS);
+        repo.setLockKeepAliveIntervalMillis(LOCK_KEEP_ALIVE_MILLIS);
+        repo.doInit();
+        return repo;
+    }
+
+    /**
+     * CAMEL-23294: Two instances concurrently try to recover the same orphan 
lock. Only one should succeed.
+     * <p>
+     * Each repository instance has its own {@code StampedLock}, so the 
JVM-local lock provides no cross-instance
+     * protection — this accurately simulates two separate JVM instances 
sharing the same database.
+     */
+    @Test
+    void testConcurrentOrphanLockRecoveryShouldAllowOnlyOneInstance() throws 
Exception {
+        String key = "ORPHAN_RACE_KEY";
+
+        // Insert an orphan lock (timestamp well past lockMaxAge)
+        Timestamp orphanTimestamp = new Timestamp(System.currentTimeMillis() - 
LOCK_MAX_AGE_MILLIS - 60_000L);
+        jdbcTemplate.update(
+                "INSERT INTO CAMEL_MESSAGEPROCESSED (processorName, messageId, 
createdAt) VALUES (?, ?, ?)",
+                PROCESSOR_NAME, key, orphanTimestamp);
+
+        // Two separate repository instances simulate two JVM instances
+        JdbcOrphanLockAwareIdempotentRepository instance1 = createRepository();
+        JdbcOrphanLockAwareIdempotentRepository instance2 = createRepository();
+
+        CyclicBarrier barrier = new CyclicBarrier(2);
+        AtomicBoolean result1 = new AtomicBoolean(false);
+        AtomicBoolean result2 = new AtomicBoolean(false);
+        AtomicReference<Exception> error1 = new AtomicReference<>();
+        AtomicReference<Exception> error2 = new AtomicReference<>();
+
+        Thread t1 = new Thread(() -> {
+            try {
+                barrier.await(5, TimeUnit.SECONDS);
+                result1.set(instance1.add(key));
+            } catch (Exception e) {
+                error1.set(e);
+            }
+        });
+
+        Thread t2 = new Thread(() -> {
+            try {
+                barrier.await(5, TimeUnit.SECONDS);
+                result2.set(instance2.add(key));
+            } catch (Exception e) {
+                error2.set(e);
+            }
+        });
+
+        t1.start();
+        t2.start();
+        t1.join(10_000);
+        t2.join(10_000);
+
+        assertNull(error1.get(), () -> "Instance 1 threw: " + error1.get());
+        assertNull(error2.get(), () -> "Instance 2 threw: " + error2.get());
+
+        // At least one instance must acquire the lock
+        assertTrue(result1.get() || result2.get(),
+                "At least one instance should acquire the orphan lock");
+
+        // CAMEL-23294: Both add() calls return true because:
+        // 1. Both queryForInt() return 0 (orphan lock: row exists but 
createdAt is too old)
+        // 2. Both insert() update the timestamp unconditionally (no guard on 
createdAt)
+        // 3. add() ignores insert()'s return value — always returns true when 
queryForInt() was 0
+        assertFalse(result1.get() && result2.get(),
+                "CAMEL-23294: Both instances acquired the same orphan lock — 
only one should succeed");
+    }
+}

Reply via email to