This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new c57368fb7b0f CAMEL-22725: camel-core - Backlog tracer for Aggregate
EIP should have first|last markers (#20210)
c57368fb7b0f is described below
commit c57368fb7b0fd5eeecb35f2c7bc316b3718b7a6e
Author: Claus Ibsen <[email protected]>
AuthorDate: Wed Dec 3 12:52:46 2025 +0100
CAMEL-22725: camel-core - Backlog tracer for Aggregate EIP should have
first|last markers (#20210)
* CAMEL-22725: camel-core - Backlog tracer for Aggregate EIP should have
first|last markers
---
.../camel/catalog/main/important-headers.json | 1 +
.../org/apache/camel/catalog/models/aggregate.json | 2 +-
.../src/main/java/org/apache/camel/Exchange.java | 3 +-
.../camel/impl/engine/CamelInternalProcessor.java | 57 ++++++---
.../META-INF/org/apache/camel/model/aggregate.json | 2 +-
.../org/apache/camel/reifier/ProcessorReifier.java | 5 +
.../management/BacklogTracerAggregateTest.java | 142 +++++++++++++++++++++
.../apache/camel/util/ImportantHeaderUtils.java | 1 +
.../camel/dsl/jbang/core/commands/Debug.java | 8 +-
.../core/commands/action/CamelBrowseAction.java | 2 +-
.../core/commands/action/CamelHistoryAction.java | 11 +-
.../core/commands/action/CamelReceiveAction.java | 2 +-
.../core/commands/action/CamelSendAction.java | 2 +-
.../core/commands/action/CamelStubAction.java | 2 +-
.../core/commands/action/CamelTraceAction.java | 8 +-
.../core/commands/action/MessageTableHelper.java | 9 +-
.../commands/action/TransformMessageAction.java | 2 +-
17 files changed, 230 insertions(+), 29 deletions(-)
diff --git
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/main/important-headers.json
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/main/important-headers.json
index e6deb8a9e2b6..0de189334617 100644
---
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/main/important-headers.json
+++
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/main/important-headers.json
@@ -1,4 +1,5 @@
[
+ "CamelAggregatedCorrelationKey",
"CamelAggregatedSize",
"CamelAsteriskEventName",
"CamelExecExitValue",
diff --git
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/aggregate.json
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/aggregate.json
index 96ae9e000afc..ff195f3a9376 100644
---
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/aggregate.json
+++
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/aggregate.json
@@ -49,7 +49,7 @@
"CamelAggregatedSize": { "index": 0, "kind": "exchangeProperty",
"displayName": "Aggregated Size", "label": "producer", "required": false,
"javaType": "int", "deprecated": false, "autowired": false, "secret": false,
"important": true, "description": "Number of exchanges that was grouped
together." },
"CamelAggregatedTimeout": { "index": 1, "kind": "exchangeProperty",
"displayName": "Aggregated Timeout", "label": "producer", "required": false,
"javaType": "long", "deprecated": false, "autowired": false, "secret": false,
"description": "The time in millis this group will timeout" },
"CamelAggregatedCompletedBy": { "index": 2, "kind": "exchangeProperty",
"displayName": "Aggregated Completed By", "label": "producer", "required":
false, "javaType": "String", "deprecated": false, "autowired": false, "secret":
false, "description": "Enum that tell how this group was completed" },
- "CamelAggregatedCorrelationKey": { "index": 3, "kind": "exchangeProperty",
"displayName": "Aggregated Correlation Key", "label": "producer", "required":
false, "javaType": "String", "deprecated": false, "autowired": false, "secret":
false, "description": "The correlation key for this aggregation group" },
+ "CamelAggregatedCorrelationKey": { "index": 3, "kind": "exchangeProperty",
"displayName": "Aggregated Correlation Key", "label": "producer", "required":
false, "javaType": "String", "deprecated": false, "autowired": false, "secret":
false, "important": true, "description": "The correlation key for this
aggregation group" },
"CamelAggregationCompleteCurrentGroup": { "index": 4, "kind":
"exchangeProperty", "displayName": "Aggregation Complete Current Group",
"label": "consumer", "required": false, "javaType": "boolean", "deprecated":
false, "autowired": false, "secret": false, "description": "Input property. Set
to true to force completing the current group. This allows to overrule any
existing completion predicates, sizes, timeouts etc, and complete the group." },
"CamelAggregationCompleteAllGroups": { "index": 5, "kind":
"exchangeProperty", "displayName": "Aggregation Complete All Groups", "label":
"consumer", "required": false, "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "description": "Input property. Set to
true to force completing all the groups (excluding this message). This allows
to overrule any existing completion predicates, sizes, timeouts etc, and
complete the group. This message is considered a [...]
"CamelAggregationCompleteAllGroupsInclusive": { "index": 6, "kind":
"exchangeProperty", "displayName": "Aggregation Complete All Groups Inclusive",
"label": "consumer", "required": false, "javaType": "boolean", "deprecated":
false, "autowired": false, "secret": false, "description": "Input property. Set
to true to force completing all the groups (including this message). This
allows to overrule any existing completion predicates, sizes, timeouts etc, and
complete the group." }
diff --git a/core/camel-api/src/main/java/org/apache/camel/Exchange.java
b/core/camel-api/src/main/java/org/apache/camel/Exchange.java
index dc0f0e927070..fc54ffd4cdad 100644
--- a/core/camel-api/src/main/java/org/apache/camel/Exchange.java
+++ b/core/camel-api/src/main/java/org/apache/camel/Exchange.java
@@ -78,7 +78,8 @@ public interface Exchange extends VariableAware {
@Metadata(label = "aggregate", description = "Enum that tell how this
group was completed",
enums =
"consumer,force,interval,predicate,size,strategy,timeout", javaType = "String")
String AGGREGATED_COMPLETED_BY = "CamelAggregatedCompletedBy";
- @Metadata(label = "aggregate", description = "The correlation key for this
aggregation group", javaType = "String")
+ @Metadata(label = "aggregate", description = "The correlation key for this
aggregation group", javaType = "String",
+ important = true)
String AGGREGATED_CORRELATION_KEY = "CamelAggregatedCorrelationKey";
String AGGREGATED_COLLECTION_GUARD = "CamelAggregatedCollectionGuard";
String AGGREGATION_STRATEGY = "CamelAggregationStrategy";
diff --git
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
index 9c3fa20dd89a..16d528afb847 100644
---
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
+++
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
@@ -690,21 +690,41 @@ public class CamelInternalProcessor extends
DelegateAsyncProcessor implements In
String routeId = routeDefinition != null ?
routeDefinition.getRouteId() : null;
if (first) {
// use route as pseudo source when first
- String source =
LoggerHelper.getLineNumberLoggerName(routeDefinition);
final long created = exchange.getClock().getCreated();
- DefaultBacklogTracerEventMessage pseudoFirst = new
DefaultBacklogTracerEventMessage(
- camelContext,
- true, false,
backlogTracer.incrementTraceCounter(), created, source, routeId, null, null,
null,
- null,
- null, null,
- level, exchangeId, correlationExchangeId, rest,
template, data);
- if (exchange.getFromEndpoint() instanceof
EndpointServiceLocation esl) {
- pseudoFirst.setEndpointServiceUrl(esl.getServiceUrl());
-
pseudoFirst.setEndpointServiceProtocol(esl.getServiceProtocol());
-
pseudoFirst.setEndpointServiceMetadata(esl.getServiceMetadata());
+
+ // special for aggregate which output are regarded as a
new first
+ boolean aggregate = false;
+ NamedNode input = routeDefinition != null ?
routeDefinition.getInput() : null;
+ if (processorDefinition.getParent() != null
+ &&
"aggregate".equals(processorDefinition.getParent().getShortName())) {
+ aggregate = true;
+ input = processorDefinition.getParent();
+ }
+ String source =
LoggerHelper.getLineNumberLoggerName(input);
+
+ DefaultBacklogTracerEventMessage pseudoFirst;
+ if (aggregate) {
+ pseudoFirst = new DefaultBacklogTracerEventMessage(
+ camelContext,
+ true, false,
backlogTracer.incrementTraceCounter(), created, source, routeId, input.getId(),
+ null, null,
+ null,
+ input.getShortName(), input.getLabel(),
+ level - 1, exchangeId, correlationExchangeId,
rest, template, data);
+ } else {
+ pseudoFirst = new DefaultBacklogTracerEventMessage(
+ camelContext,
+ true, false,
backlogTracer.incrementTraceCounter(), created, source, routeId, null, null,
null,
+ null, null, null,
+ level, exchangeId, correlationExchangeId,
rest, template, data);
+ if (exchange.getFromEndpoint() instanceof
EndpointServiceLocation esl) {
+
pseudoFirst.setEndpointServiceUrl(esl.getServiceUrl());
+
pseudoFirst.setEndpointServiceProtocol(esl.getServiceProtocol());
+
pseudoFirst.setEndpointServiceMetadata(esl.getServiceMetadata());
+ }
}
backlogTracer.traceEvent(pseudoFirst);
-
exchange.getExchangeExtension().addOnCompletion(createOnCompletion(source,
pseudoFirst));
+
exchange.getExchangeExtension().addOnCompletion(createOnCompletion(source,
aggregate, pseudoFirst));
}
String source =
LoggerHelper.getLineNumberLoggerName(processorDefinition);
DefaultBacklogTracerEventMessage event = new
DefaultBacklogTracerEventMessage(
@@ -721,7 +741,8 @@ public class CamelInternalProcessor extends
DelegateAsyncProcessor implements In
return null;
}
- private SynchronizationAdapter createOnCompletion(String source,
DefaultBacklogTracerEventMessage pseudoFirst) {
+ private SynchronizationAdapter createOnCompletion(
+ String source, boolean aggregate,
DefaultBacklogTracerEventMessage pseudoFirst) {
return new SynchronizationAdapter() {
@Override
public void onDone(Exchange exchange) {
@@ -732,15 +753,19 @@ public class CamelInternalProcessor extends
DelegateAsyncProcessor implements In
boolean includeExchangeProperties =
backlogTracer.isIncludeExchangeProperties();
boolean includeExchangeVariables =
backlogTracer.isIncludeExchangeVariables();
long created = exchange.getClock().getCreated();
- int level = processorDefinition.getLevel();
+ int level = pseudoFirst.getToNodeLevel();
+ // aggregate is special
+ String toNode = aggregate ? pseudoFirst.getToNode() : null;
+ String toNodeShortName = aggregate ?
pseudoFirst.getToNodeShortName() : null;
+ String toNodeLabel = aggregate ?
pseudoFirst.getToNodeLabel() : null;
JsonObject data =
MessageHelper.dumpAsJSonObject(exchange.getIn(), includeExchangeProperties,
includeExchangeVariables, true,
true, backlogTracer.isBodyIncludeStreams(),
backlogTracer.isBodyIncludeFiles(),
backlogTracer.getBodyMaxChars());
DefaultBacklogTracerEventMessage pseudoLast = new
DefaultBacklogTracerEventMessage(
camelContext,
- false, true,
backlogTracer.incrementTraceCounter(), created, source, routeId, null, null,
null,
- null, null, null,
+ false, true,
backlogTracer.incrementTraceCounter(), created, source, routeId, toNode, null,
null,
+ null, toNodeShortName, toNodeLabel,
level, exchangeId, correlationExchangeId, rest,
template, data);
backlogTracer.traceEvent(pseudoLast);
doneProcessing(exchange, pseudoLast);
diff --git
a/core/camel-core-model/src/generated/resources/META-INF/org/apache/camel/model/aggregate.json
b/core/camel-core-model/src/generated/resources/META-INF/org/apache/camel/model/aggregate.json
index 96ae9e000afc..ff195f3a9376 100644
---
a/core/camel-core-model/src/generated/resources/META-INF/org/apache/camel/model/aggregate.json
+++
b/core/camel-core-model/src/generated/resources/META-INF/org/apache/camel/model/aggregate.json
@@ -49,7 +49,7 @@
"CamelAggregatedSize": { "index": 0, "kind": "exchangeProperty",
"displayName": "Aggregated Size", "label": "producer", "required": false,
"javaType": "int", "deprecated": false, "autowired": false, "secret": false,
"important": true, "description": "Number of exchanges that was grouped
together." },
"CamelAggregatedTimeout": { "index": 1, "kind": "exchangeProperty",
"displayName": "Aggregated Timeout", "label": "producer", "required": false,
"javaType": "long", "deprecated": false, "autowired": false, "secret": false,
"description": "The time in millis this group will timeout" },
"CamelAggregatedCompletedBy": { "index": 2, "kind": "exchangeProperty",
"displayName": "Aggregated Completed By", "label": "producer", "required":
false, "javaType": "String", "deprecated": false, "autowired": false, "secret":
false, "description": "Enum that tell how this group was completed" },
- "CamelAggregatedCorrelationKey": { "index": 3, "kind": "exchangeProperty",
"displayName": "Aggregated Correlation Key", "label": "producer", "required":
false, "javaType": "String", "deprecated": false, "autowired": false, "secret":
false, "description": "The correlation key for this aggregation group" },
+ "CamelAggregatedCorrelationKey": { "index": 3, "kind": "exchangeProperty",
"displayName": "Aggregated Correlation Key", "label": "producer", "required":
false, "javaType": "String", "deprecated": false, "autowired": false, "secret":
false, "important": true, "description": "The correlation key for this
aggregation group" },
"CamelAggregationCompleteCurrentGroup": { "index": 4, "kind":
"exchangeProperty", "displayName": "Aggregation Complete Current Group",
"label": "consumer", "required": false, "javaType": "boolean", "deprecated":
false, "autowired": false, "secret": false, "description": "Input property. Set
to true to force completing the current group. This allows to overrule any
existing completion predicates, sizes, timeouts etc, and complete the group." },
"CamelAggregationCompleteAllGroups": { "index": 5, "kind":
"exchangeProperty", "displayName": "Aggregation Complete All Groups", "label":
"consumer", "required": false, "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "description": "Input property. Set to
true to force completing all the groups (excluding this message). This allows
to overrule any existing completion predicates, sizes, timeouts etc, and
complete the group. This message is considered a [...]
"CamelAggregationCompleteAllGroupsInclusive": { "index": 6, "kind":
"exchangeProperty", "displayName": "Aggregation Complete All Groups Inclusive",
"label": "consumer", "required": false, "javaType": "boolean", "deprecated":
false, "autowired": false, "secret": false, "description": "Input property. Set
to true to force completing all the groups (including this message). This
allows to overrule any existing completion predicates, sizes, timeouts etc, and
complete the group." }
diff --git
a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ProcessorReifier.java
b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ProcessorReifier.java
index 326ab94d78aa..7c92b00c5b76 100644
---
a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ProcessorReifier.java
+++
b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ProcessorReifier.java
@@ -686,6 +686,11 @@ public abstract class ProcessorReifier<T extends
ProcessorDefinition<?>> extends
if (route != null && !route.getOutputs().isEmpty()) {
first = route.getOutputs().get(0) == definition;
}
+ // special for aggregate which output are regarded as a new first
+ NamedNode targetOutputDef = child != null ? child : definition;
+ if (!first && targetOutputDef.getParent() instanceof
AggregateDefinition agg) {
+ first = agg.getOutputs().get(0) == targetOutputDef;
+ }
// initialize the channel
channel.initChannel(this.route, definition, child, interceptors,
processor, route, first);
diff --git
a/core/camel-management/src/test/java/org/apache/camel/management/BacklogTracerAggregateTest.java
b/core/camel-management/src/test/java/org/apache/camel/management/BacklogTracerAggregateTest.java
new file mode 100644
index 000000000000..44cc2dbda37e
--- /dev/null
+++
b/core/camel-management/src/test/java/org/apache/camel/management/BacklogTracerAggregateTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.management;
+
+import java.util.List;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.camel.builder.AggregationStrategies;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.spi.BacklogTracerEventMessage;
+import org.apache.camel.util.json.JsonObject;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledOnOs;
+import org.junit.jupiter.api.condition.OS;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+@DisabledOnOs(OS.AIX)
+public class BacklogTracerAggregateTest extends ManagementTestSupport {
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testBacklogTracerAggregate() throws Exception {
+ MBeanServer mbeanServer = getMBeanServer();
+ ObjectName on
+ = new ObjectName("org.apache.camel:context=" +
context.getManagementName() + ",type=tracer,name=BacklogTracer");
+ assertNotNull(on);
+ mbeanServer.isRegistered(on);
+
+ Boolean enabled = (Boolean) mbeanServer.getAttribute(on, "Enabled");
+ assertEquals(Boolean.TRUE, enabled, "Should be enabled");
+
+ getMockEndpoint("mock:input").expectedMessageCount(3);
+ getMockEndpoint("mock:output").expectedMessageCount(3);
+ getMockEndpoint("mock:result").expectedBodiesReceived("A,B,C");
+
+ template.sendBody("direct:start", "A");
+ template.sendBody("direct:start", "B");
+ template.sendBody("direct:start", "C");
+
+ assertMockEndpointsSatisfied();
+
+ List<BacklogTracerEventMessage> events
+ = (List<BacklogTracerEventMessage>) mbeanServer.invoke(on,
"dumpAllTracedMessages", null, null);
+
+ assertNotNull(events);
+ assertEquals(19, events.size());
+
+ // this is the 1st incoming message
+ BacklogTracerEventMessage event = events.get(0);
+ Assertions.assertTrue(event.isFirst());
+ Assertions.assertFalse(event.isLast());
+ Assertions.assertEquals("direct://start", event.getEndpointUri());
+ JsonObject jo = (JsonObject) event.asJSon();
+ Assertions.assertEquals("{type=java.lang.String, value=A}",
jo.getMap("message").get("body").toString());
+ event = events.get(4);
+ Assertions.assertFalse(event.isFirst());
+ Assertions.assertTrue(event.isLast());
+ Assertions.assertEquals("direct://start", event.getEndpointUri());
+ jo = (JsonObject) event.asJSon();
+ Assertions.assertEquals("{type=java.lang.String, value=A}",
jo.getMap("message").get("body").toString());
+
+ // this is the 2nd incoming message
+ event = events.get(5);
+ Assertions.assertTrue(event.isFirst());
+ Assertions.assertFalse(event.isLast());
+ Assertions.assertEquals("direct://start", event.getEndpointUri());
+ jo = (JsonObject) event.asJSon();
+ Assertions.assertEquals("{type=java.lang.String, value=B}",
jo.getMap("message").get("body").toString());
+ event = events.get(9);
+ Assertions.assertFalse(event.isFirst());
+ Assertions.assertTrue(event.isLast());
+ Assertions.assertEquals("direct://start", event.getEndpointUri());
+ jo = (JsonObject) event.asJSon();
+ Assertions.assertEquals("{type=java.lang.String, value=B}",
jo.getMap("message").get("body").toString());
+
+ // this is the 3rd incoming message
+ event = events.get(10);
+ Assertions.assertTrue(event.isFirst());
+ Assertions.assertFalse(event.isLast());
+ Assertions.assertEquals("direct://start", event.getEndpointUri());
+ jo = (JsonObject) event.asJSon();
+ Assertions.assertEquals("{type=java.lang.String, value=C}",
jo.getMap("message").get("body").toString());
+ event = events.get(14);
+ Assertions.assertFalse(event.isFirst());
+ Assertions.assertTrue(event.isLast());
+ Assertions.assertEquals("direct://start", event.getEndpointUri());
+ jo = (JsonObject) event.asJSon();
+ Assertions.assertEquals("{type=java.lang.String, value=C}",
jo.getMap("message").get("body").toString());
+
+ // this is the outgoing aggregation result
+ event = events.get(15);
+ Assertions.assertTrue(event.isFirst());
+ Assertions.assertFalse(event.isLast());
+ Assertions.assertNull(event.getEndpointUri());
+ jo = (JsonObject) event.asJSon();
+ Assertions.assertEquals("{type=java.lang.String, value=A,B,C}",
jo.getMap("message").get("body").toString());
+ event = events.get(18);
+ Assertions.assertFalse(event.isFirst());
+ Assertions.assertTrue(event.isLast());
+ Assertions.assertNull(event.getEndpointUri());
+ jo = (JsonObject) event.asJSon();
+ Assertions.assertEquals("{type=java.lang.String, value=A,B,C}",
jo.getMap("message").get("body").toString());
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ context.setBacklogTracing(true);
+
+ from("direct:start").routeId("myRoute")
+ .to("mock:input").id("input")
+
.aggregate(constant(true)).completionSize(3).aggregationStrategy(AggregationStrategies.string(",")).id("aggregate")
+ .log("aggregated ${body}").id("log")
+ .to("mock:result").id("result")
+ .end()
+ .to("mock:output").id("output");
+ }
+ };
+ }
+
+}
diff --git
a/core/camel-util/src/main/java/org/apache/camel/util/ImportantHeaderUtils.java
b/core/camel-util/src/main/java/org/apache/camel/util/ImportantHeaderUtils.java
index c757c1d58240..815558277eb2 100644
---
a/core/camel-util/src/main/java/org/apache/camel/util/ImportantHeaderUtils.java
+++
b/core/camel-util/src/main/java/org/apache/camel/util/ImportantHeaderUtils.java
@@ -27,6 +27,7 @@ public final class ImportantHeaderUtils {
Arrays.asList(
// Generated by camel build tools - do NOT edit this list!
// IMPORTANT-HEADER-KEYS: START
+ "CamelAggregatedCorrelationKey",
"CamelAggregatedSize",
"CamelAsteriskEventName",
"CamelExecExitValue",
diff --git
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/Debug.java
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/Debug.java
index 11eb406c9b8e..c2bf8cbac795 100644
---
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/Debug.java
+++
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/Debug.java
@@ -664,6 +664,10 @@ public class Debug extends Run {
row.location = jo.getString("location");
row.routeId = jo.getString("routeId");
row.nodeId = jo.getString("nodeId");
+ if ("aggregate".equals(jo.getString("nodeShortName")))
{
+ row.aggregate = new JsonObject();
+ row.aggregate.put("nodeLabel",
jo.getString("nodeLabel"));
+ }
String uri = jo.getString("endpointUri");
if (uri != null) {
row.endpoint = new JsonObject();
@@ -1063,7 +1067,8 @@ public class Debug extends Run {
}
private String getDataAsTable(SuspendedRow r) {
- return tableHelper.getDataAsTable(r.exchangeId, r.exchangePattern,
r.endpoint, r.endpointService, r.message,
+ return tableHelper.getDataAsTable(r.exchangeId, r.exchangePattern,
r.aggregate, r.endpoint, r.endpointService,
+ r.message,
r.exception);
}
@@ -1149,6 +1154,7 @@ public class Debug extends Run {
long elapsed;
boolean done;
boolean failed;
+ JsonObject aggregate;
JsonObject endpoint;
JsonObject endpointService;
JsonObject message;
diff --git
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelBrowseAction.java
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelBrowseAction.java
index e3c6cad05318..7b6479f258d3 100644
---
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelBrowseAction.java
+++
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelBrowseAction.java
@@ -240,7 +240,7 @@ public class CamelBrowseAction extends ActionBaseCommand {
}
JsonObject ep = new JsonObject();
ep.put("endpoint", row.uri);
- String table = tableHelper.getDataAsTable(exchangeId,
null, ep, null, message, null);
+ String table = tableHelper.getDataAsTable(exchangeId,
null, null, ep, null, message, null);
String header = String.format("Browse Message: (%s/%s)",
row.position + i + 1,
row.position + row.messages.size());
if (loggingColor) {
diff --git
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelHistoryAction.java
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelHistoryAction.java
index c653f234fd06..fc1eec3285f4 100644
---
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelHistoryAction.java
+++
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelHistoryAction.java
@@ -180,7 +180,7 @@ public class CamelHistoryAction extends ActionWatchCommand {
}
private String getProcessor(Row r) {
- if (r.first || r.last) {
+ if (r.endpoint != null && (r.first || r.last)) {
return "from[" + r.endpoint.getString("endpoint") + "]";
} else if (r.nodeLabel != null) {
return StringHelper.padString(r.nodeLevel, 2) + r.nodeLabel;
@@ -337,7 +337,7 @@ public class CamelHistoryAction extends ActionWatchCommand {
Row next = i > 0 && i < rows.size() + 2 ? rows.get(i + 1) : null;
String uri = r.endpoint != null ? r.endpoint.getString("endpoint")
: null;
- Row t = r.first ? r : next; // if sending to endpoint then we
should find details in the next step as they are response
+ Row t = uri != null && r.first ? r : next; // if sending to
endpoint then we should find details in the next step as they are response
if (uri != null && t != null) {
StringJoiner sj = new StringJoiner(" ");
var map = extractComponentModel(uri, t);
@@ -376,6 +376,13 @@ public class CamelHistoryAction extends ActionWatchCommand
{
String sv = map.remove("CamelSplitSize");
r.summary = "Split (" + sv + ")";
}
+ } else if ("aggregate".equals(r.nodeShortName)) {
+ if (r.first) {
+ var map = extractEipModel("aggregate", r);
+ String ak = map.remove("CamelAggregatedCorrelationKey");
+ String as = map.remove("CamelAggregatedSize");
+ r.summary = "Aggregate (key:" + ak + " size:" + as + ")";
+ }
}
}
}
diff --git
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelReceiveAction.java
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelReceiveAction.java
index 5d143e1f74da..043f0766d00f 100644
---
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelReceiveAction.java
+++
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelReceiveAction.java
@@ -892,7 +892,7 @@ public class CamelReceiveAction extends ActionBaseCommand {
}
private String getDataAsTable(Row r) {
- return tableHelper.getDataAsTable(null, null, r.endpoint,
r.endpointService, r.message, null);
+ return tableHelper.getDataAsTable(null, null, null, r.endpoint,
r.endpointService, r.message, null);
}
protected String getEndpointUri(StatusRow r) {
diff --git
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelSendAction.java
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelSendAction.java
index 3908bfb73df2..2ddbeae59a23 100644
---
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelSendAction.java
+++
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelSendAction.java
@@ -249,7 +249,7 @@ public class CamelSendAction extends ActionBaseCommand {
tableHelper.setShowExchangeProperties(showExchangeProperties);
tableHelper.setShowExchangeVariables(showExchangeVariables);
String mep = (reply || replyFile != null) ? "InOut" :
"InOnly";
- String table = tableHelper.getDataAsTable(exchangeId,
mep, jo, null, message, cause);
+ String table = tableHelper.getDataAsTable(exchangeId,
mep, null, jo, null, message, cause);
printer().println(table);
}
}
diff --git
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelStubAction.java
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelStubAction.java
index b9b63b692c43..00bd890fc3d6 100644
---
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelStubAction.java
+++
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelStubAction.java
@@ -294,7 +294,7 @@ public class CamelStubAction extends ActionWatchCommand {
}
String data
=
tableHelper.getDataAsTable(root.getString("exchangeId"),
root.getString("exchangePattern"),
- row.endpoint, null, root, null);
+ null, row.endpoint, null, root, null);
if (data != null) {
String[] lines =
data.split(System.lineSeparator());
if (lines.length > 0) {
diff --git
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelTraceAction.java
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelTraceAction.java
index 97b1d51c21ea..d772f65473bf 100644
---
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelTraceAction.java
+++
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelTraceAction.java
@@ -580,6 +580,10 @@ public class CamelTraceAction extends ActionBaseCommand {
row.location = jo.getString("location");
row.routeId = jo.getString("routeId");
row.nodeId = jo.getString("nodeId");
+ if ("aggregate".equals(jo.getString("nodeShortName"))) {
+ row.aggregate = new JsonObject();
+ row.aggregate.put("nodeLabel",
jo.getString("nodeLabel"));
+ }
String uri = jo.getString("endpointUri");
if (uri != null) {
row.endpoint = new JsonObject();
@@ -876,7 +880,8 @@ public class CamelTraceAction extends ActionBaseCommand {
}
private String getDataAsTable(Row r) {
- return tableHelper.getDataAsTable(r.exchangeId, r.exchangePattern,
r.endpoint, r.endpointService, r.message,
+ return tableHelper.getDataAsTable(r.exchangeId, r.exchangePattern,
r.aggregate, r.endpoint, r.endpointService,
+ r.message,
r.exception);
}
@@ -965,6 +970,7 @@ public class CamelTraceAction extends ActionBaseCommand {
long elapsed;
boolean done;
boolean failed;
+ JsonObject aggregate;
JsonObject endpoint;
JsonObject endpointService;
JsonObject message;
diff --git
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/MessageTableHelper.java
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/MessageTableHelper.java
index 72422652175f..cfbb4bc55f09 100644
---
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/MessageTableHelper.java
+++
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/MessageTableHelper.java
@@ -93,7 +93,7 @@ public class MessageTableHelper {
public String getDataAsTable(
String exchangeId, String exchangePattern,
- JsonObject endpoint, JsonObject endpointService,
+ JsonObject aggregate, JsonObject endpoint, JsonObject
endpointService,
JsonObject root, JsonObject cause) {
List<TableRow> rows = new ArrayList<>();
@@ -108,6 +108,13 @@ public class MessageTableHelper {
String tab5 = null;
String tab6 = null;
+ if (aggregate != null) {
+ eRow = new TableRow("Aggregate", null, null,
aggregate.getString("nodeLabel"));
+ tab0 = AsciiTable.getTable(AsciiTable.NO_BORDERS, List.of(eRow),
Arrays.asList(
+ new Column().dataAlign(HorizontalAlign.LEFT)
+ .minWidth(showExchangeProperties ||
showExchangeVariables ? 12 : 10).with(TableRow::kindAsString),
+ new
Column().dataAlign(HorizontalAlign.LEFT).with(TableRow::valueAsString)));
+ }
if (endpoint != null) {
eRow = new TableRow("Endpoint", null, null,
endpoint.getString("endpoint"));
tab0 = AsciiTable.getTable(AsciiTable.NO_BORDERS, List.of(eRow),
Arrays.asList(
diff --git
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/TransformMessageAction.java
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/TransformMessageAction.java
index dc5c711bd011..274af9d325c2 100644
---
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/TransformMessageAction.java
+++
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/TransformMessageAction.java
@@ -284,7 +284,7 @@ public class TransformMessageAction extends
ActionWatchCommand {
tableHelper.setPretty(pretty);
tableHelper.setLoggingColor(loggingColor);
tableHelper.setShowExchangeProperties(showExchangeProperties);
- String table = tableHelper.getDataAsTable(exchangeId,
"InOut", null, null, message, cause);
+ String table = tableHelper.getDataAsTable(exchangeId,
"InOut", null, null, null, message, cause);
printer().println(table);
}
}