This is an automated email from the ASF dual-hosted git repository.
klease pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new fd98cd3d50d CAMEL-18275: Address issue of completions not being run in
SEDA pipeline (#8015)
fd98cd3d50d is described below
commit fd98cd3d50d1c72f79457ef50d96a27ec8e5e5be
Author: klease <[email protected]>
AuthorDate: Wed Aug 3 15:40:18 2022 +0200
CAMEL-18275: Address issue of completions not being run in SEDA pipeline
(#8015)
When synchronizations are handed over, add a method to allow some
housekeeping
to be performed. In the
OnCompletionProcessor.OnCompletionSynchronizationAfterConsumer
this method stores the routeId on the Exchange.
---
.../apache/camel/spi/SynchronizationVetoable.java | 9 +++++++++
.../apache/camel/impl/engine/DefaultUnitOfWork.java | 7 ++++++-
.../camel/processor/OnCompletionProcessor.java | 20 ++++++++++++++++++++
.../apache/camel/support/SynchronizationAdapter.java | 5 +++++
4 files changed, 40 insertions(+), 1 deletion(-)
diff --git
a/core/camel-api/src/main/java/org/apache/camel/spi/SynchronizationVetoable.java
b/core/camel-api/src/main/java/org/apache/camel/spi/SynchronizationVetoable.java
index 167ae544037..87270e87d3c 100644
---
a/core/camel-api/src/main/java/org/apache/camel/spi/SynchronizationVetoable.java
+++
b/core/camel-api/src/main/java/org/apache/camel/spi/SynchronizationVetoable.java
@@ -16,6 +16,8 @@
*/
package org.apache.camel.spi;
+import org.apache.camel.Exchange;
+
/**
* A vetoable {@link org.apache.camel.spi.Synchronization}.
* <p/>
@@ -40,4 +42,11 @@ public interface SynchronizationVetoable extends
Synchronization {
* @return <tt>true</tt> to allow handover, <tt>false</tt> to deny.
*/
boolean allowHandover();
+
+ /**
+ * A method to perform optional housekeeping when a Synchronization is
being handed over.
+ *
+ * @param target The Exchange to which the synchronizations are being
transferred.
+ */
+ void beforeHandover(Exchange target);
}
diff --git
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
index fa8c5c75f19..9580c258a6f 100644
---
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
+++
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
@@ -210,14 +210,19 @@ public class DefaultUnitOfWork implements UnitOfWork {
Synchronization synchronization = it.next();
boolean handover = true;
+ SynchronizationVetoable veto = null;
if (synchronization instanceof SynchronizationVetoable) {
- SynchronizationVetoable veto = (SynchronizationVetoable)
synchronization;
+ veto = (SynchronizationVetoable) synchronization;
handover = veto.allowHandover();
}
if (handover && (filter == null || filter.test(synchronization))) {
log.trace("Handover synchronization {} to: {}",
synchronization, target);
target.adapt(ExtendedExchange.class).addOnCompletion(synchronization);
+ // Allow the synchronization to do housekeeping before transfer
+ if (veto != null) {
+ veto.beforeHandover(target);
+ }
// remove it if its handed over
it.remove();
} else {
diff --git
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
index 8261aa451ec..8607957db2e 100644
---
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
+++
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
@@ -368,6 +368,26 @@ public class OnCompletionProcessor extends
AsyncProcessorSupport implements Trac
return "onFailureOnly";
}
}
+
+ @Override
+ public void beforeHandover(Exchange target) {
+ // The onAfterRoute method will not be called after the handover
+ // To ensure that completions are called, remember the route IDs
here.
+ // Assumption: the fromRouteId on the target Exchange is the route
+ // which owns the completion
+ LOG.debug("beforeHandover from Route {}", target.getFromRouteId());
+ final String exchangeRouteId = target.getFromRouteId();
+ if (routeScoped && exchangeRouteId != null &&
exchangeRouteId.equals(routeId)) {
+ List<String> routeIds =
target.getProperty(ExchangePropertyKey.ON_COMPLETION_ROUTE_IDS, List.class);
+ if (routeIds == null) {
+ routeIds = new ArrayList<>();
+
target.setProperty(ExchangePropertyKey.ON_COMPLETION_ROUTE_IDS, routeIds);
+ }
+ if (!routeIds.contains(exchangeRouteId)) {
+ routeIds.add(exchangeRouteId);
+ }
+ }
+ }
}
private final class OnCompletionSynchronizationBeforeConsumer extends
SynchronizationAdapter implements Ordered {
diff --git
a/core/camel-support/src/main/java/org/apache/camel/support/SynchronizationAdapter.java
b/core/camel-support/src/main/java/org/apache/camel/support/SynchronizationAdapter.java
index 6852686017f..7d9ce2c67f0 100644
---
a/core/camel-support/src/main/java/org/apache/camel/support/SynchronizationAdapter.java
+++
b/core/camel-support/src/main/java/org/apache/camel/support/SynchronizationAdapter.java
@@ -63,4 +63,9 @@ public class SynchronizationAdapter implements
SynchronizationVetoable, Ordered,
public void onAfterRoute(Route route, Exchange exchange) {
// noop
}
+
+ @Override
+ public void beforeHandover(Exchange target) {
+ // noop
+ }
}