This is an automated email from the ASF dual-hosted git repository.
cdeppisch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 12c4a64487e CAMEL-20981: Fix ceOverrides option with YAML DSL
12c4a64487e is described below
commit 12c4a64487e664061cd9b0f15de6ccb0397bdfaa
Author: Christoph Deppisch <[email protected]>
AuthorDate: Mon Jul 15 21:06:39 2024 +0200
CAMEL-20981: Fix ceOverrides option with YAML DSL
- Makes sure that CloudEvent attributes use dash style keys even with YAML
DSL where key get converted to camelCase style
- Also support property placeholders when setting ceOverrides option for
CloudEvent attributes
- Add some unit tests
---
.../knative/ce/AbstractCloudEventProcessor.java | 7 +-
.../component/knative/KnativeComponentTest.java | 16 +++-
.../src/test/resources/environment.json | 16 ++++
.../src/test/resources/environment_classic.json | 16 ++++
.../component/knative/http/KnativeHttpTest.java | 91 ++++++++++++++++++++++
5 files changed, 144 insertions(+), 2 deletions(-)
diff --git
a/components/camel-knative/camel-knative-component/src/main/java/org/apache/camel/component/knative/ce/AbstractCloudEventProcessor.java
b/components/camel-knative/camel-knative-component/src/main/java/org/apache/camel/component/knative/ce/AbstractCloudEventProcessor.java
index 16cf27dd3b4..0e7f258d338 100644
---
a/components/camel-knative/camel-knative-component/src/main/java/org/apache/camel/component/knative/ce/AbstractCloudEventProcessor.java
+++
b/components/camel-knative/camel-knative-component/src/main/java/org/apache/camel/component/knative/ce/AbstractCloudEventProcessor.java
@@ -29,6 +29,7 @@ import org.apache.camel.cloudevents.CloudEvent;
import org.apache.camel.component.knative.KnativeEndpoint;
import org.apache.camel.component.knative.spi.Knative;
import org.apache.camel.component.knative.spi.KnativeResource;
+import org.apache.camel.util.StringHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -125,7 +126,11 @@ abstract class AbstractCloudEventProcessor implements
CloudEventProcessor {
return DateTimeFormatter.ISO_INSTANT.format(created);
});
- headers.putAll(service.getCeOverrides());
+ for (Map.Entry<String, String> ceOverride :
service.getCeOverrides().entrySet()) {
+ // when using keys in YAML DSL camelCase is being used by
default -convert to dash due to CloudEvents spec
+ headers.put(StringHelper.camelCaseToDash(ceOverride.getKey()),
+
exchange.getContext().resolvePropertyPlaceholders(ceOverride.getValue()));
+ }
};
}
diff --git
a/components/camel-knative/camel-knative-component/src/test/java/org/apache/camel/component/knative/KnativeComponentTest.java
b/components/camel-knative/camel-knative-component/src/test/java/org/apache/camel/component/knative/KnativeComponentTest.java
index dd3369dd1de..200e3bfa0a4 100644
---
a/components/camel-knative/camel-knative-component/src/test/java/org/apache/camel/component/knative/KnativeComponentTest.java
+++
b/components/camel-knative/camel-knative-component/src/test/java/org/apache/camel/component/knative/KnativeComponentTest.java
@@ -16,6 +16,10 @@
*/
package org.apache.camel.component.knative;
+import java.util.Collections;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
import org.apache.camel.CamelContext;
import org.apache.camel.component.knative.spi.Knative;
import org.apache.camel.component.knative.spi.KnativeEnvironment;
@@ -61,7 +65,7 @@ public class KnativeComponentTest {
void testLoadEnvironment(String resource) throws Exception {
KnativeEnvironment env = mandatoryLoadFromResource(context, resource);
- assertThat(env.stream()).hasSize(6);
+ assertThat(env.stream()).hasSize(7);
assertThat(env.stream()).anyMatch(s -> s.getType() ==
Knative.Type.channel);
assertThat(env.stream()).anyMatch(s -> s.getType() ==
Knative.Type.endpoint);
assertThat(env.stream()).anyMatch(s -> s.getType() ==
Knative.Type.event);
@@ -114,9 +118,19 @@ public class KnativeComponentTest {
{
KnativeEndpoint endpoint =
context.getEndpoint("knative:event/event", KnativeEndpoint.class);
assertThat(endpoint.lookupServiceDefinition("example-broker",
Knative.EndpointKind.sink)).isPresent();
+ assertThat(endpoint.lookupServiceDefinition("example-broker-1",
Knative.EndpointKind.sink)).isPresent();
assertThat(endpoint.lookupServiceDefinition("c1",
Knative.EndpointKind.source)).isNotPresent();
assertThat(endpoint.lookupServiceDefinition("example-broker",
Knative.EndpointKind.sink)).isPresent().get()
.hasFieldOrPropertyWithValue("url",
"http://broker-example/default/example-broker");
+ assertThat(endpoint.lookupServiceDefinition("example-broker",
Knative.EndpointKind.sink)).isPresent().get()
+ .hasFieldOrPropertyWithValue("ceOverrides",
Collections.emptyMap());
+ assertThat(endpoint.lookupServiceDefinition("example-broker-1",
Knative.EndpointKind.sink)).isPresent().get()
+ .hasFieldOrPropertyWithValue("url",
"http://broker-example/default/example-broker-1");
+ assertThat(endpoint.lookupServiceDefinition("example-broker-1",
Knative.EndpointKind.sink)).isPresent().get()
+ .hasFieldOrPropertyWithValue("ceOverrides",
+ Stream.of("ce-source=custom-source",
"ce-type=custom-type", "ce-subject=custom-subject")
+ .map(it -> it.split("="))
+ .collect(Collectors.toMap(it -> it[0], it
-> it[1])));
}
{
KnativeEndpoint endpoint =
context.getEndpoint("knative:event/evt1", KnativeEndpoint.class);
diff --git
a/components/camel-knative/camel-knative-component/src/test/resources/environment.json
b/components/camel-knative/camel-knative-component/src/test/resources/environment.json
index eeb55b66bdf..7783d0a11c9 100644
---
a/components/camel-knative/camel-knative-component/src/test/resources/environment.json
+++
b/components/camel-knative/camel-knative-component/src/test/resources/environment.json
@@ -41,6 +41,22 @@
"knative.name": "example-broker"
}
},
+ {
+ "type": "event",
+ "name": "example-broker-1",
+ "url": "http://broker-example/default/example-broker-1",
+ "metadata": {
+ "camel.endpoint.kind": "sink",
+ "knative.apiVersion": "eventing.knative.dev/v1",
+ "knative.kind": "Broker",
+ "knative.name": "example-broker-1"
+ },
+ "ceOverrides": {
+ "ce-type": "custom-type",
+ "ce-source": "custom-source",
+ "ce-subject": "custom-subject"
+ }
+ },
{
"type": "event",
"name": "evt1",
diff --git
a/components/camel-knative/camel-knative-component/src/test/resources/environment_classic.json
b/components/camel-knative/camel-knative-component/src/test/resources/environment_classic.json
index 1be2d047569..cd9120bbc5f 100644
---
a/components/camel-knative/camel-knative-component/src/test/resources/environment_classic.json
+++
b/components/camel-knative/camel-knative-component/src/test/resources/environment_classic.json
@@ -41,6 +41,22 @@
"knative.name": "example-broker"
}
},
+ {
+ "type": "event",
+ "name": "example-broker-1",
+ "url": "http://broker-example/default/example-broker-1",
+ "metadata": {
+ "camel.endpoint.kind": "sink",
+ "knative.apiVersion": "eventing.knative.dev/v1",
+ "knative.kind": "Broker",
+ "knative.name": "example-broker-1"
+ },
+ "ceOverrides": {
+ "ce-type": "custom-type",
+ "ce-source": "custom-source",
+ "ce-subject": "custom-subject"
+ }
+ },
{
"type": "event",
"name": "evt1",
diff --git
a/components/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
b/components/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
index 328197bd8ad..40bf8a15c48 100644
---
a/components/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
+++
b/components/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
@@ -44,6 +44,7 @@ import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.test.AvailablePortFinder;
import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.StringHelper;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
@@ -1478,6 +1479,51 @@ public class KnativeHttpTest {
}
}
+ @ParameterizedTest
+ @EnumSource(CloudEvents.class)
+ void testHeadersOverrideFromEnvPropertyPlaceholder(CloudEvent ce) throws
Exception {
+ final KnativeHttpServer server = new KnativeHttpServer(context);
+ final String typeHeaderKey = httpAttribute(ce,
CloudEvent.CAMEL_CLOUD_EVENT_TYPE);
+ final String typeHeaderVal = UUID.randomUUID().toString();
+ final String sourceHeaderKey = httpAttribute(ce,
CloudEvent.CAMEL_CLOUD_EVENT_SOURCE);
+ final String sourceHeaderVal = UUID.randomUUID().toString();
+
+ configureKnativeComponent(
+ context,
+ ce,
+ endpoint(
+ Knative.EndpointKind.sink,
+ "ep",
+ String.format("http://%s:%d", server.getHost(),
server.getPort()),
+ Map.of(
+ Knative.KNATIVE_CLOUD_EVENT_TYPE,
"org.apache.camel.event",
+ Knative.CONTENT_TYPE, "text/plain",
+ Knative.KNATIVE_CE_OVERRIDE_PREFIX +
typeHeaderKey,
+ "{{someProperty:%s}}".formatted(typeHeaderVal),
+ Knative.KNATIVE_CE_OVERRIDE_PREFIX +
sourceHeaderKey,
+
"{{someProperty:%s}}".formatted(sourceHeaderVal))));
+
+ RouteBuilder.addRoutes(context, b -> {
+ b.from("direct:start")
+ .to("knative:endpoint/ep");
+ });
+
+ context.start();
+ try {
+ server.start();
+ template.sendBody("direct:start", "");
+
+ HttpServerRequest request = server.poll(30, TimeUnit.SECONDS);
+ assertThat(request.getHeader(httpAttribute(ce,
CloudEvent.CAMEL_CLOUD_EVENT_VERSION))).isEqualTo(ce.version());
+ assertThat(request.getHeader(httpAttribute(ce,
CloudEvent.CAMEL_CLOUD_EVENT_TYPE))).isEqualTo(typeHeaderVal);
+ assertThat(request.getHeader(httpAttribute(ce,
CloudEvent.CAMEL_CLOUD_EVENT_ID))).isNotNull();
+ assertThat(request.getHeader(httpAttribute(ce,
CloudEvent.CAMEL_CLOUD_EVENT_SOURCE))).isEqualTo(sourceHeaderVal);
+
assertThat(request.getHeader(Exchange.CONTENT_TYPE)).isEqualTo("text/plain");
+ } finally {
+ server.stop();
+ }
+ }
+
@ParameterizedTest
@EnumSource(CloudEvents.class)
void testHeadersOverrideFromURI(CloudEvent ce) throws Exception {
@@ -1566,6 +1612,51 @@ public class KnativeHttpTest {
}
}
+ @ParameterizedTest
+ @EnumSource(CloudEvents.class)
+ void testHeadersOverrideFromYamlConf(CloudEvent ce) throws Exception {
+ final KnativeHttpServer server = new KnativeHttpServer(context);
+ final String typeHeaderKey =
StringHelper.dashToCamelCase(httpAttribute(ce,
CloudEvent.CAMEL_CLOUD_EVENT_TYPE));
+ final String typeHeaderVal = UUID.randomUUID().toString();
+ final String sourceHeaderKey =
StringHelper.dashToCamelCase(httpAttribute(ce,
CloudEvent.CAMEL_CLOUD_EVENT_SOURCE));
+ final String sourceHeaderVal = UUID.randomUUID().toString();
+
+ KnativeComponent component = configureKnativeComponent(
+ context,
+ ce,
+ endpoint(
+ Knative.EndpointKind.sink,
+ "ep",
+ String.format("http://%s:%d", server.getHost(),
server.getPort()),
+ Map.of(
+ Knative.KNATIVE_CLOUD_EVENT_TYPE,
"org.apache.camel.event",
+ Knative.CONTENT_TYPE, "text/plain")));
+
+ component.getConfiguration().setCeOverride(Map.of(
+ Knative.KNATIVE_CE_OVERRIDE_PREFIX + typeHeaderKey,
typeHeaderVal,
+ Knative.KNATIVE_CE_OVERRIDE_PREFIX + sourceHeaderKey,
sourceHeaderVal));
+
+ RouteBuilder.addRoutes(context, b -> {
+ b.from("direct:start")
+ .to("knative:endpoint/ep");
+ });
+
+ context.start();
+ try {
+ server.start();
+ template.sendBody("direct:start", "");
+
+ HttpServerRequest request = server.poll(30, TimeUnit.SECONDS);
+ assertThat(request.getHeader(httpAttribute(ce,
CloudEvent.CAMEL_CLOUD_EVENT_VERSION))).isEqualTo(ce.version());
+ assertThat(request.getHeader(httpAttribute(ce,
CloudEvent.CAMEL_CLOUD_EVENT_TYPE))).isEqualTo(typeHeaderVal);
+ assertThat(request.getHeader(httpAttribute(ce,
CloudEvent.CAMEL_CLOUD_EVENT_ID))).isNotNull();
+ assertThat(request.getHeader(httpAttribute(ce,
CloudEvent.CAMEL_CLOUD_EVENT_SOURCE))).isEqualTo(sourceHeaderVal);
+
assertThat(request.getHeader(Exchange.CONTENT_TYPE)).isEqualTo("text/plain");
+ } finally {
+ server.stop();
+ }
+ }
+
@ParameterizedTest
@EnumSource(CloudEvents.class)
void testHeadersOverrideFromRouteWithCamelHeader(CloudEvent ce) throws
Exception {