Miroslav Borský created CAMEL-16750:
---------------------------------------

             Summary: Do not propagate exception when concurrent FILE component 
consumers try to acquire lock in JdbcMessageIdRepository
                 Key: CAMEL-16750
                 URL: https://issues.apache.org/jira/browse/CAMEL-16750
             Project: Camel
          Issue Type: Improvement
          Components: camel-sql
            Reporter: Miroslav Borský


If there are concurrent consumers trying to acquire lock using the 
org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository class, those 
not successful will receive an exception and propagate this exception as a 
warning to the log.

Use case: Several nodes processing files from common network path / mount. Only 
one of them will acquire the lock and process the file. This is normal 
operation and as such it should either not produce any WARN messages or at 
least not produce them when readLockLoggingLevel=OFF

Route to reproduce:
{noformat}
<?xml version="1.0" encoding="UTF-8"?>
<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0";
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
           xmlns:xs="http://www.w3.org/2001/XMLSchema";
           xsi:schemaLocation="http://www.osgi.org/xmlns/blueprint/v1.0.0 
https://www.osgi.org/xmlns/blueprint/v1.0.0/blueprint.xsd";>
        <property-placeholder 
xmlns="http://aries.apache.org/blueprint/xmlns/blueprint-cm/v1.0.0"; 
persistent-id="FileTransfer_Route" id="extProperties">
                <default-properties>
                        <property name="src.dir" value="c:/Aplikace/src"/>
                        <property name="arch.dir" value="c:/Aplikace/arch"/>
                        <property name="pooling.delay" value="1s"/>
                        <property name="readLock.checkInterval" value="1000"/>
                        <property name="readLock.timeout" value="2000"/>
                        <property name="readLock.release" value="60000"/>
                        <property name="readLock.wipe" value="120000"/>
                </default-properties>
        </property-placeholder>
        <bean id="localHost" class="java.net.InetAddress" 
factory-method="getLocalHost"/>
        <bean id="hostName" class="java.lang.String" factory-ref="localHost" 
factory-method="getHostName"/>
        <reference id="dataSource" interface="javax.sql.DataSource" 
filter="(osgi.jndi.service.name=jdbc/etl)"/>
        <bean id="messageIdStore" 
class="org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository">
                <argument ref="dataSource"/>
                <argument ref="hostName"/>
                <property name="createString" value="CREATE TABLE 
CAMEL_MESSAGEPROCESSED(processorName [varchar](255) NULL, messageId 
[varchar](100) NULL, createdAt datetime NULL, CONSTRAINT uMessageId 
UNIQUE(messageId))"/>
        </bean>
        <camelContext xmlns="http://camel.apache.org/schema/blueprint"; 
id="FileTransfer_Route-ctx">
                <propertyPlaceholder id="properties" 
location="blueprint:extProperties"/>
                <route>
                        <from 
uri="file:{{src.dir}}?move={{arch.dir}}/${header.CamelFileName}&amp;recursive=true&amp;autoCreate=false&amp;startingDirectoryMustExist=true&amp;delay={{pooling.delay}}&amp;greedy=true&amp;readLock=idempotent-changed&amp;readLockMinLength=0&amp;readLockCheckInterval={{readLock.checkInterval}}&amp;readLockTimeout={{readLock.timeout}}&amp;idempotentRepository=#messageIdStore&amp;readLockRemoveOnCommit=true&amp;readLockIdempotentReleaseAsync=true&amp;readLockIdempotentReleaseDelay={{readLock.release}}&amp;readLockLoggingLevel=OFF"/>

                        <to 
uri="file:e/dst?tempFileName=${header.CamelFileNameOnly}.tmp"/>
                </route>
        </camelContext>
</blueprint>
{noformat}

Exception stack:
{noformat}
2021-06-22T16:37:42,382 | WARN  | Camel (FileTransfer_Route-ctx) thread #0 - 
file://c:/Aplikace/src | FileConsumer                     | 105 - 
org.apache.camel.camel-support - 3.10.0 | 
file://c:/Aplikace/src?autoCreate=false&delay=1s&greedy=true&idempotentRepository=%23messageIdStore&move=c%3A%2FAplikace%2Farch%2F%24%7Bheader.CamelFileName%7D&readLock=idempotent-changed&readLockCheckInterval=1000&readLockIdempotentReleaseAsync=true&readLockIdempotentReleaseDelay=60000&readLockLoggingLevel=OFF&readLockMinLength=0&readLockRemoveOnCommit=true&readLockTimeout=2000&recursive=true&startingDirectoryMustExist=true
 cannot begin processing file: GenericFile[c:\Aplikace\src\mysql-connect
org.springframework.dao.DuplicateKeyException: PreparedStatementCallback; SQL 
[INSERT INTO CAMEL_MESSAGEPROCESSED (processorName, messageId, createdAt) 
VALUES (?, ?, ?)]; Violation of UNIQUE KEY constraint 'uMessageId'. Cannot 
insert duplicate key in object 'dbo.CAMEL_MESSAGEPROCESSED'. The duplicate key 
value is (c:\Aplikace\src\file-currently-transfered-by-other-node).; nested 
exception is java.sql.SQLException: Violation of UNIQUE KEY constraint 
'uMessageId'. Cannot insert duplicate key in object 
'dbo.CAMEL_MESSAGEPROCESSED'. The duplicate key value is 
(c:\Aplikace\src\file-currently-transfered-by-other-node).
        at 
org.springframework.jdbc.support.SQLErrorCodeSQLExceptionTranslator.doTranslate(SQLErrorCodeSQLExceptionTranslator.java:247)
 ~[!/:?]
        at 
org.springframework.jdbc.support.AbstractFallbackSQLExceptionTranslator.translate(AbstractFallbackSQLExceptionTranslator.java:70)
 ~[!/:?]
        at 
org.springframework.jdbc.core.JdbcTemplate.translateException(JdbcTemplate.java:1541)
 ~[!/:?]
        at 
org.springframework.jdbc.core.JdbcTemplate.execute(JdbcTemplate.java:667) 
~[!/:?]
        at 
org.springframework.jdbc.core.JdbcTemplate.update(JdbcTemplate.java:960) ~[!/:?]
        at 
org.springframework.jdbc.core.JdbcTemplate.update(JdbcTemplate.java:1015) 
~[!/:?]
        at 
org.springframework.jdbc.core.JdbcTemplate.update(JdbcTemplate.java:1025) 
~[!/:?]
        at 
org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository.insert(JdbcMessageIdRepository.java:118)
 ~[!/:3.10.0]
        at 
org.apache.camel.processor.idempotent.jdbc.AbstractJdbcMessageIdRepository$1.doInTransaction(AbstractJdbcMessageIdRepository.java:137)
 ~[!/:3.10.0]
        at 
org.apache.camel.processor.idempotent.jdbc.AbstractJdbcMessageIdRepository$1.doInTransaction(AbstractJdbcMessageIdRepository.java:133)
 ~[!/:3.10.0]
        at 
org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140)
 ~[!/:?]
        at 
org.apache.camel.processor.idempotent.jdbc.AbstractJdbcMessageIdRepository.add(AbstractJdbcMessageIdRepository.java:133)
 ~[!/:3.10.0]
        at 
org.apache.camel.spi.IdempotentRepository.add(IdempotentRepository.java:95) 
~[!/:3.10.0]
        at 
org.apache.camel.component.file.strategy.FileIdempotentChangedRepositoryReadLockStrategy.acquireExclusiveReadLock(FileIdempotentChangedRepositoryReadLockStrategy.java:89)
 ~[!/:3.10.0]
        at 
org.apache.camel.component.file.strategy.GenericFileProcessStrategySupport.begin(GenericFileProcessStrategySupport.java:72)
 ~[!/:3.10.0]
        at 
org.apache.camel.component.file.strategy.GenericFileRenameProcessStrategy.begin(GenericFileRenameProcessStrategy.java:39)
 ~[!/:3.10.0]
        at 
org.apache.camel.component.file.GenericFileConsumer.processExchange(GenericFileConsumer.java:389)
 [!/:3.10.0]
        at 
org.apache.camel.component.file.GenericFileConsumer.processBatch(GenericFileConsumer.java:246)
 [!/:3.10.0]
        at 
org.apache.camel.component.file.GenericFileConsumer.poll(GenericFileConsumer.java:207)
 [!/:3.10.0]
        at 
org.apache.camel.support.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:190)
 [!/:3.10.0]
        at 
org.apache.camel.support.ScheduledPollConsumer.run(ScheduledPollConsumer.java:107)
 [!/:3.10.0]
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
[?:1.8.0_265]
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) 
[?:1.8.0_265]
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
 [?:1.8.0_265]
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
 [?:1.8.0_265]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_265]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_265]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_265]
Caused by: java.sql.SQLException: Violation of UNIQUE KEY constraint 
'uMessageId'. Cannot insert duplicate key in object 
'dbo.CAMEL_MESSAGEPROCESSED'. The duplicate key value is 
(c:\Aplikace\src\file-currently-transfered-by-other-node).
        at 
net.sourceforge.jtds.jdbc.SQLDiagnostic.addDiagnostic(SQLDiagnostic.java:372) 
~[!/:?]
        at net.sourceforge.jtds.jdbc.TdsCore.tdsErrorToken(TdsCore.java:2988) 
~[!/:?]
        at net.sourceforge.jtds.jdbc.TdsCore.nextToken(TdsCore.java:2421) 
~[!/:?]
        at net.sourceforge.jtds.jdbc.TdsCore.getMoreResults(TdsCore.java:671) 
~[!/:?]
        at 
net.sourceforge.jtds.jdbc.JtdsStatement.processResults(JtdsStatement.java:613) 
~[!/:?]
        at 
net.sourceforge.jtds.jdbc.JtdsStatement.executeSQL(JtdsStatement.java:572) 
~[!/:?]
        at 
net.sourceforge.jtds.jdbc.JtdsPreparedStatement.executeUpdate(JtdsPreparedStatement.java:727)
 ~[!/:?]
        at 
org.springframework.jdbc.core.JdbcTemplate.lambda$update$2(JdbcTemplate.java:965)
 ~[!/:?]
        at 
org.springframework.jdbc.core.JdbcTemplate.execute(JdbcTemplate.java:651) 
~[!/:?]
        ... 24 more
{noformat}
 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to