This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 5c4b7e8 Polished
5c4b7e8 is described below
commit 5c4b7e8bf2e5676c3bda5e1aca3b8a7d07371fae
Author: Claus Ibsen <[email protected]>
AuthorDate: Wed Mar 31 07:46:02 2021 +0200
Polished
---
.../apache/camel/catalog/docs/sql-component.adoc | 19 ++++++++++++++++++
.../JdbcOrphanLockAwareIdempotentRepository.java | 23 +++++++++++-----------
.../modules/ROOT/pages/sql-component.adoc | 19 ++++++++++++++++++
3 files changed, 49 insertions(+), 12 deletions(-)
diff --git
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/sql-component.adoc
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/sql-component.adoc
index a42a90e..af5a795 100644
---
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/sql-component.adoc
+++
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/sql-component.adoc
@@ -574,6 +574,25 @@ the second one is the message id (`String`).
The option `tableName` can be used to use the default SQL queries but with a
different table name.
However if you want to customize the SQL queries then you can configure each
of them individually.
+=== Orphan Lock aware Jdbc IdempotentRepository
+
+One of the limitations of
`org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository` is that it
does not handle orphan locks resulting from JVM crash or non graceful shutdown.
This can result in unprocessed files/messages if this is implementation is used
with camel-file, camel-ftp etc. if you need to address orphan locks processing
then use
+`org.apache.camel.processor.idempotent.jdbc.JdbcOrphanLockAwareIdempotentRepository`.
This repository keeps track of the locks held by an instance of the
application. For each lock held, the application will send keep alive signals
to the lock repository resulting in updating the createdAt column with the
current Timestamp. When an application instance tries to acquire a lock if the,
then there are three possibilities exist :
+
+* lock entry does not exist then the lock is provided using the base
implementation of `JdbcMessageIdRepository`.
+
+* lock already exists and the createdAt < System.currentTimeMillis() -
lockMaxAgeMillis. In this case it is assumed that an active instance has the
lock and the lock is not provided to the new instance requesting the lock
+
+* lock already exists and the createdAt > = System.currentTimeMillis() -
lockMaxAgeMillis. In this case it is assumed that there is no active instance
which has the lock and the lock is provided to the requesting instance. The
reason behind is that if the original instance which had the lock, if it was
still running, it would have updated the Timestamp on createdAt using its
keepAlive mechanism
+
+This repository has two additional configuration parameters
+
+[cols="1,1"]
+|===
+|Parameter | Description
+|lockMaxAgeMillis | This refers to the duration after which the lock is
considered orphaned i.e. if the currentTimestamp - createdAt >=
lockMaxAgeMillis then lock is orphaned.
+|lockKeepAliveIntervalMillis | The frequency at which keep alive updates are
done to createdAt Timestamp column.
+|===
== Using the JDBC based aggregation repository
diff --git
a/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcOrphanLockAwareIdempotentRepository.java
b/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcOrphanLockAwareIdempotentRepository.java
index a787e5a..a84cbff 100644
---
a/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcOrphanLockAwareIdempotentRepository.java
+++
b/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcOrphanLockAwareIdempotentRepository.java
@@ -45,8 +45,6 @@ import
org.springframework.transaction.support.TransactionTemplate;
* A lock is granted to an instance if either the entry for the lock
attributes do not exists in the
* CAMEL_MESSAGEPROCESSED table or if in case the instance holding the lock
has crashed. This is determined if the
* timestamp on the createdAt column is more than the lockMaxAge.
- *
- * *
*/
public class JdbcOrphanLockAwareIdempotentRepository extends
JdbcMessageIdRepository implements ShutdownableService {
@@ -144,16 +142,22 @@ public class JdbcOrphanLockAwareIdempotentRepository
extends JdbcMessageIdReposi
updateTimestampQuery =
updateTimestampQuery.replaceFirst(DEFAULT_TABLENAME, getTableName());
}
executorServiceManager = context.getExecutorServiceManager();
- executorService =
executorServiceManager.newSingleThreadScheduledExecutor(this,
this.getClass().getName());
- /**
- * Schedule a task which will keep updating the timestamp on the
acquired locks at lockKeepAliveInterval so that
- * the timestamp does not reaches lockMaxAge
- */
+ executorService =
executorServiceManager.newSingleThreadScheduledExecutor(this,
this.getClass().getSimpleName());
+
+ // Schedule a task which will keep updating the timestamp on the
acquired locks at lockKeepAliveInterval so that
+ // the timestamp does not reaches lockMaxAge
executorService.scheduleWithFixedDelay(new LockKeepAliveTask(),
lockKeepAliveIntervalMillis,
lockKeepAliveIntervalMillis, TimeUnit.MILLISECONDS);
}
@Override
+ protected void doShutdown() throws Exception {
+ if (executorServiceManager != null && executorService != null) {
+ executorServiceManager.shutdownGraceful(executorService);
+ }
+ }
+
+ @Override
protected int delete() {
long stamp = sl.writeLock();
try {
@@ -182,11 +186,6 @@ public class JdbcOrphanLockAwareIdempotentRepository
extends JdbcMessageIdReposi
}
}
- @Override
- public void shutdown() {
- executorServiceManager.shutdownGraceful(executorService);
- }
-
public Set<ProcessorNameAndMessageId> getProcessorNameMessageIdSet() {
return processorNameMessageIdSet;
}
diff --git a/docs/components/modules/ROOT/pages/sql-component.adoc
b/docs/components/modules/ROOT/pages/sql-component.adoc
index 9a187de..9263b64 100644
--- a/docs/components/modules/ROOT/pages/sql-component.adoc
+++ b/docs/components/modules/ROOT/pages/sql-component.adoc
@@ -576,6 +576,25 @@ the second one is the message id (`String`).
The option `tableName` can be used to use the default SQL queries but with a
different table name.
However if you want to customize the SQL queries then you can configure each
of them individually.
+=== Orphan Lock aware Jdbc IdempotentRepository
+
+One of the limitations of
`org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository` is that it
does not handle orphan locks resulting from JVM crash or non graceful shutdown.
This can result in unprocessed files/messages if this is implementation is used
with camel-file, camel-ftp etc. if you need to address orphan locks processing
then use
+`org.apache.camel.processor.idempotent.jdbc.JdbcOrphanLockAwareIdempotentRepository`.
This repository keeps track of the locks held by an instance of the
application. For each lock held, the application will send keep alive signals
to the lock repository resulting in updating the createdAt column with the
current Timestamp. When an application instance tries to acquire a lock if the,
then there are three possibilities exist :
+
+* lock entry does not exist then the lock is provided using the base
implementation of `JdbcMessageIdRepository`.
+
+* lock already exists and the createdAt < System.currentTimeMillis() -
lockMaxAgeMillis. In this case it is assumed that an active instance has the
lock and the lock is not provided to the new instance requesting the lock
+
+* lock already exists and the createdAt > = System.currentTimeMillis() -
lockMaxAgeMillis. In this case it is assumed that there is no active instance
which has the lock and the lock is provided to the requesting instance. The
reason behind is that if the original instance which had the lock, if it was
still running, it would have updated the Timestamp on createdAt using its
keepAlive mechanism
+
+This repository has two additional configuration parameters
+
+[cols="1,1"]
+|===
+|Parameter | Description
+|lockMaxAgeMillis | This refers to the duration after which the lock is
considered orphaned i.e. if the currentTimestamp - createdAt >=
lockMaxAgeMillis then lock is orphaned.
+|lockKeepAliveIntervalMillis | The frequency at which keep alive updates are
done to createdAt Timestamp column.
+|===
== Using the JDBC based aggregation repository