This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new b56401c5450 CAMEL-18186: camel-saga - InMemorySagaCoordinator should
link parent span to exchange so for example distributed tracing spans are
linked together.
b56401c5450 is described below
commit b56401c54505dfa988842a1d1378a4f8f6b80e5e
Author: Claus Ibsen <[email protected]>
AuthorDate: Sun Jun 30 13:02:14 2024 +0200
CAMEL-18186: camel-saga - InMemorySagaCoordinator should link parent span
to exchange so for example distributed tracing spans are linked together.
---
.../apache/camel/processor/SagaFailuresTest.java | 2 --
.../apache/camel/processor/SagaOptionsTest.java | 2 --
.../java/org/apache/camel/processor/SagaTest.java | 2 --
.../apache/camel/saga/InMemorySagaCoordinator.java | 22 ++++++++++++++++------
4 files changed, 16 insertions(+), 12 deletions(-)
diff --git
a/core/camel-core/src/test/java/org/apache/camel/processor/SagaFailuresTest.java
b/core/camel-core/src/test/java/org/apache/camel/processor/SagaFailuresTest.java
index 3663fe55709..3ea66d96c13 100644
---
a/core/camel-core/src/test/java/org/apache/camel/processor/SagaFailuresTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/processor/SagaFailuresTest.java
@@ -22,10 +22,8 @@ import org.apache.camel.ContextTestSupport;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.saga.InMemorySagaService;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
-@Disabled("TODO: Fix me")
public class SagaFailuresTest extends ContextTestSupport {
private AtomicInteger maxFailures;
diff --git
a/core/camel-core/src/test/java/org/apache/camel/processor/SagaOptionsTest.java
b/core/camel-core/src/test/java/org/apache/camel/processor/SagaOptionsTest.java
index 1e3dc8d2bc2..aa7900bec82 100644
---
a/core/camel-core/src/test/java/org/apache/camel/processor/SagaOptionsTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/processor/SagaOptionsTest.java
@@ -21,12 +21,10 @@ import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.saga.InMemorySagaService;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.fail;
-@Disabled("TODO: Fix me")
public class SagaOptionsTest extends ContextTestSupport {
@Test
diff --git
a/core/camel-core/src/test/java/org/apache/camel/processor/SagaTest.java
b/core/camel-core/src/test/java/org/apache/camel/processor/SagaTest.java
index ed145392440..d36e70c93b1 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/SagaTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/SagaTest.java
@@ -29,14 +29,12 @@ import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.model.SagaPropagation;
import org.apache.camel.saga.CamelSagaService;
import org.apache.camel.saga.InMemorySagaService;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.jupiter.api.Assertions.fail;
-@Disabled("TODO: Fix me")
public class SagaTest extends ContextTestSupport {
private OrderManagerService orderManagerService;
diff --git
a/core/camel-support/src/main/java/org/apache/camel/saga/InMemorySagaCoordinator.java
b/core/camel-support/src/main/java/org/apache/camel/saga/InMemorySagaCoordinator.java
index 7cb85441216..dbb1c9c5323 100644
---
a/core/camel-support/src/main/java/org/apache/camel/saga/InMemorySagaCoordinator.java
+++
b/core/camel-support/src/main/java/org/apache/camel/saga/InMemorySagaCoordinator.java
@@ -43,6 +43,8 @@ import org.slf4j.LoggerFactory;
*/
public class InMemorySagaCoordinator implements CamelSagaCoordinator {
+ private static final String ACTIVE_SPAN_PROPERTY =
"OpenTracing.activeSpan";
+
private enum Status {
RUNNING,
COMPENSATING,
@@ -192,10 +194,10 @@ public class InMemorySagaCoordinator implements
CamelSagaCoordinator {
private CompletableFuture<Boolean> doFinalize(
Exchange exchange, Endpoint endpoint, CamelSagaStep step, int
doneAttempts, String description) {
- populateExchange(exchange, step);
+ Exchange target = createExchange(exchange, endpoint, step);
return CompletableFuture.supplyAsync(() -> {
- Exchange res =
camelContext.createFluentProducerTemplate().to(endpoint).withExchange(exchange).send();
+ Exchange res =
camelContext.createFluentProducerTemplate().to(endpoint).withExchange(target).send();
Exception ex = res.getException();
if (ex != null) {
throw new RuntimeCamelException(res.getException());
@@ -214,7 +216,7 @@ public class InMemorySagaCoordinator implements
CamelSagaCoordinator {
} else {
CompletableFuture<Boolean> future = new CompletableFuture<>();
sagaService.getExecutorService().schedule(() -> {
- doFinalize(exchange, endpoint, step, currentAttempt,
description).whenComplete((res, ex) -> {
+ doFinalize(target, endpoint, step, currentAttempt,
description).whenComplete((res, ex) -> {
if (ex != null) {
future.completeExceptionally(ex);
} else {
@@ -227,15 +229,23 @@ public class InMemorySagaCoordinator implements
CamelSagaCoordinator {
});
}
- private void populateExchange(Exchange exchange, CamelSagaStep step) {
- exchange.getMessage().setHeader(Exchange.SAGA_LONG_RUNNING_ACTION,
getId());
+ private Exchange createExchange(Exchange parent, Endpoint endpoint,
CamelSagaStep step) {
+ Exchange answer = endpoint.createExchange();
+ answer.getMessage().setHeader(Exchange.SAGA_LONG_RUNNING_ACTION,
getId());
+
+ // preserve span from parent, so we can link this new exchange to the
parent span for distributed tracing
+ Object span = parent != null ?
parent.getProperty(ACTIVE_SPAN_PROPERTY) : null;
+ if (span != null) {
+ answer.setProperty(ACTIVE_SPAN_PROPERTY, span);
+ }
Map<String, Object> values = optionValues.get(step);
if (values != null) {
for (Map.Entry<String, Object> entry : values.entrySet()) {
- exchange.getMessage().setHeader(entry.getKey(),
entry.getValue());
+ answer.getMessage().setHeader(entry.getKey(),
entry.getValue());
}
}
+ return answer;
}
private <T> List<T> reversed(List<T> list) {