This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new b56401c5450 CAMEL-18186: camel-saga - InMemorySagaCoordinator should 
link parent span to exchange so for example distributed tracing spans are 
linked together.
b56401c5450 is described below

commit b56401c54505dfa988842a1d1378a4f8f6b80e5e
Author: Claus Ibsen <[email protected]>
AuthorDate: Sun Jun 30 13:02:14 2024 +0200

    CAMEL-18186: camel-saga - InMemorySagaCoordinator should link parent span 
to exchange so for example distributed tracing spans are linked together.
---
 .../apache/camel/processor/SagaFailuresTest.java   |  2 --
 .../apache/camel/processor/SagaOptionsTest.java    |  2 --
 .../java/org/apache/camel/processor/SagaTest.java  |  2 --
 .../apache/camel/saga/InMemorySagaCoordinator.java | 22 ++++++++++++++++------
 4 files changed, 16 insertions(+), 12 deletions(-)

diff --git 
a/core/camel-core/src/test/java/org/apache/camel/processor/SagaFailuresTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/processor/SagaFailuresTest.java
index 3663fe55709..3ea66d96c13 100644
--- 
a/core/camel-core/src/test/java/org/apache/camel/processor/SagaFailuresTest.java
+++ 
b/core/camel-core/src/test/java/org/apache/camel/processor/SagaFailuresTest.java
@@ -22,10 +22,8 @@ import org.apache.camel.ContextTestSupport;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.saga.InMemorySagaService;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
-@Disabled("TODO: Fix me")
 public class SagaFailuresTest extends ContextTestSupport {
 
     private AtomicInteger maxFailures;
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/processor/SagaOptionsTest.java 
b/core/camel-core/src/test/java/org/apache/camel/processor/SagaOptionsTest.java
index 1e3dc8d2bc2..aa7900bec82 100644
--- 
a/core/camel-core/src/test/java/org/apache/camel/processor/SagaOptionsTest.java
+++ 
b/core/camel-core/src/test/java/org/apache/camel/processor/SagaOptionsTest.java
@@ -21,12 +21,10 @@ import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.saga.InMemorySagaService;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
 import static org.junit.jupiter.api.Assertions.fail;
 
-@Disabled("TODO: Fix me")
 public class SagaOptionsTest extends ContextTestSupport {
 
     @Test
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/processor/SagaTest.java 
b/core/camel-core/src/test/java/org/apache/camel/processor/SagaTest.java
index ed145392440..d36e70c93b1 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/SagaTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/SagaTest.java
@@ -29,14 +29,12 @@ import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.model.SagaPropagation;
 import org.apache.camel.saga.CamelSagaService;
 import org.apache.camel.saga.InMemorySagaService;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
 import static org.awaitility.Awaitility.await;
 import static org.hamcrest.Matchers.equalTo;
 import static org.junit.jupiter.api.Assertions.fail;
 
-@Disabled("TODO: Fix me")
 public class SagaTest extends ContextTestSupport {
 
     private OrderManagerService orderManagerService;
diff --git 
a/core/camel-support/src/main/java/org/apache/camel/saga/InMemorySagaCoordinator.java
 
b/core/camel-support/src/main/java/org/apache/camel/saga/InMemorySagaCoordinator.java
index 7cb85441216..dbb1c9c5323 100644
--- 
a/core/camel-support/src/main/java/org/apache/camel/saga/InMemorySagaCoordinator.java
+++ 
b/core/camel-support/src/main/java/org/apache/camel/saga/InMemorySagaCoordinator.java
@@ -43,6 +43,8 @@ import org.slf4j.LoggerFactory;
  */
 public class InMemorySagaCoordinator implements CamelSagaCoordinator {
 
+    private static final String ACTIVE_SPAN_PROPERTY = 
"OpenTracing.activeSpan";
+
     private enum Status {
         RUNNING,
         COMPENSATING,
@@ -192,10 +194,10 @@ public class InMemorySagaCoordinator implements 
CamelSagaCoordinator {
 
     private CompletableFuture<Boolean> doFinalize(
             Exchange exchange, Endpoint endpoint, CamelSagaStep step, int 
doneAttempts, String description) {
-        populateExchange(exchange, step);
+        Exchange target = createExchange(exchange, endpoint, step);
 
         return CompletableFuture.supplyAsync(() -> {
-            Exchange res = 
camelContext.createFluentProducerTemplate().to(endpoint).withExchange(exchange).send();
+            Exchange res = 
camelContext.createFluentProducerTemplate().to(endpoint).withExchange(target).send();
             Exception ex = res.getException();
             if (ex != null) {
                 throw new RuntimeCamelException(res.getException());
@@ -214,7 +216,7 @@ public class InMemorySagaCoordinator implements 
CamelSagaCoordinator {
             } else {
                 CompletableFuture<Boolean> future = new CompletableFuture<>();
                 sagaService.getExecutorService().schedule(() -> {
-                    doFinalize(exchange, endpoint, step, currentAttempt, 
description).whenComplete((res, ex) -> {
+                    doFinalize(target, endpoint, step, currentAttempt, 
description).whenComplete((res, ex) -> {
                         if (ex != null) {
                             future.completeExceptionally(ex);
                         } else {
@@ -227,15 +229,23 @@ public class InMemorySagaCoordinator implements 
CamelSagaCoordinator {
         });
     }
 
-    private void populateExchange(Exchange exchange, CamelSagaStep step) {
-        exchange.getMessage().setHeader(Exchange.SAGA_LONG_RUNNING_ACTION, 
getId());
+    private Exchange createExchange(Exchange parent, Endpoint endpoint, 
CamelSagaStep step) {
+        Exchange answer = endpoint.createExchange();
+        answer.getMessage().setHeader(Exchange.SAGA_LONG_RUNNING_ACTION, 
getId());
+
+        // preserve span from parent, so we can link this new exchange to the 
parent span for distributed tracing
+        Object span = parent != null ? 
parent.getProperty(ACTIVE_SPAN_PROPERTY) : null;
+        if (span != null) {
+            answer.setProperty(ACTIVE_SPAN_PROPERTY, span);
+        }
 
         Map<String, Object> values = optionValues.get(step);
         if (values != null) {
             for (Map.Entry<String, Object> entry : values.entrySet()) {
-                exchange.getMessage().setHeader(entry.getKey(), 
entry.getValue());
+                answer.getMessage().setHeader(entry.getKey(), 
entry.getValue());
             }
         }
+        return answer;
     }
 
     private <T> List<T> reversed(List<T> list) {

Reply via email to