This is an automated email from the ASF dual-hosted git repository.
nferraro pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k-runtime.git
The following commit(s) were added to refs/heads/master by this push:
new 177022c CamelCloudEventXXX not converted to CE header #177
new f4c290e Merge pull request #178 from lburgazzoli/github-177
177022c is described below
commit 177022c3f2935f80d3d2ab59bdcf50407dc7faef
Author: lburgazzoli <[email protected]>
AuthorDate: Sun Oct 27 17:55:01 2019 +0100
CamelCloudEventXXX not converted to CE header #177
---
.../camel/component/knative/spi/CloudEvents.java | 2 +-
.../component/knative/http/KnativeHttpTest.java | 120 ++++++++++++++++++++-
.../knative/ce/AbstractCloudEventProcessor.java | 41 ++++---
3 files changed, 143 insertions(+), 20 deletions(-)
diff --git
a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/CloudEvents.java
b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/CloudEvents.java
index dd8ab96..6118ef0 100644
---
a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/CloudEvents.java
+++
b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/CloudEvents.java
@@ -34,7 +34,7 @@ public enum CloudEvents implements CloudEvent {
Attribute.simple(CloudEvent.CAMEL_CLOUD_EVENT_ID, "CE-EventID",
"eventID"),
Attribute.simple(CloudEvent.CAMEL_CLOUD_EVENT_TIME,
"CE-EventTime", "eventTime"),
Attribute.simple(CloudEvent.CAMEL_CLOUD_EVENT_SCHEMA_URL,
"CE-SchemaURL", "schemaURL"),
- Attribute.simple(CloudEvent.CAMEL_CLOUD_EVENT_CONTENT_TYPE,
"ContentType", "contentType"),
+ Attribute.simple(CloudEvent.CAMEL_CLOUD_EVENT_CONTENT_TYPE,
"Content-Type", "contentType"),
Attribute.simple(CloudEvent.CAMEL_CLOUD_EVENT_EXTENSIONS,
"CE-Extensions", "extensions")
)
)),
diff --git
a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
index 9628838..390efd4 100644
---
a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
+++
b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
@@ -262,7 +262,6 @@ public class KnativeHttpTest {
public void configure() throws Exception {
from("knative:endpoint/myEndpoint")
.to("mock:ce");
-
from("direct:source")
.toF("undertow:http://localhost:%d/a/path", port);
}
@@ -272,11 +271,16 @@ public class KnativeHttpTest {
MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).id(),
ce.version());
+
mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(),
ce.version());
mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).id(),
"org.apache.camel.event");
+
mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(),
"org.apache.camel.event");
mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).id(),
"myEventID");
+
mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(),
"myEventID");
mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).id(),
"/somewhere");
+
mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(),
"/somewhere");
mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
mock.expectedMessagesMatches(e ->
e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).id()));
+ mock.expectedMessagesMatches(e ->
e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http()));
mock.expectedBodiesReceived("test");
mock.expectedMessageCount(1);
@@ -918,8 +922,7 @@ public class KnativeHttpTest {
public void configure() throws Exception {
from("direct:source")
.to("knative:event/myEvent?kind=MyObject&apiVersion=v1");
-
- fromF("knative:event/myEvent?kind=MyOtherObject&apiVersion=v2")
+ from("knative:event/myEvent?kind=MyOtherObject&apiVersion=v2")
.to("mock:ce");
}
});
@@ -1370,8 +1373,6 @@ public class KnativeHttpTest {
}
}
-
-
@ParameterizedTest
@MethodSource("provideCloudEventsImplementations")
void testHeadersOverrideFromConf(CloudEvent ce) throws Exception {
@@ -1434,5 +1435,114 @@ public class KnativeHttpTest {
server.stop();
}
}
+
+ @ParameterizedTest
+ @MethodSource("provideCloudEventsImplementations")
+ void testHeadersOverrideFromRouteWithCamelHeader(CloudEvent ce) throws
Exception {
+ configureKnativeComponent(
+ context,
+ ce,
+ endpoint(
+ Knative.EndpointKind.sink,
+ "ep",
+ "localhost",
+ port,
+ KnativeSupport.mapOf(
+ Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+ Knative.CONTENT_TYPE, "text/plain"
+ )
+ )
+ );
+
+ CountDownLatch latch = new CountDownLatch(1);
+ AtomicReference<HttpServerExchange> exchange = new AtomicReference<>();
+
+ Undertow server = Undertow.builder()
+ .addHttpListener(port, "localhost")
+ .setHandler(se -> {
+ exchange.set(se);
+ latch.countDown();
+ })
+ .build();
+
+ RouteBuilder.addRoutes(context, b -> {
+ b.from("direct:start")
+
.setHeader(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).constant("myType")
+ .to("knative:endpoint/ep");
+ });
+
+ context.start();
+ try {
+ server.start();
+ template.sendBody("direct:start", "");
+
+ latch.await();
+
+ HeaderMap headers = exchange.get().getRequestHeaders();
+
+
assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http())).isEqualTo(ce.version());
+
assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())).isEqualTo("myType");
+
assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http())).isNotNull();
+
assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http())).isEqualTo("knative://endpoint/ep");
+
assertThat(headers.getFirst(Exchange.CONTENT_TYPE)).isEqualTo("text/plain");
+ } finally {
+ server.stop();
+ }
+ }
+
+ @ParameterizedTest
+ @MethodSource("provideCloudEventsImplementations")
+ void testHeadersOverrideFromRouteWithCEHeader(CloudEvent ce) throws
Exception {
+ configureKnativeComponent(
+ context,
+ ce,
+ endpoint(
+ Knative.EndpointKind.sink,
+ "ep",
+ "localhost",
+ port,
+ KnativeSupport.mapOf(
+ Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+ Knative.CONTENT_TYPE, "text/plain"
+ )
+ )
+ );
+
+ CountDownLatch latch = new CountDownLatch(1);
+ AtomicReference<HttpServerExchange> exchange = new AtomicReference<>();
+
+ Undertow server = Undertow.builder()
+ .addHttpListener(port, "localhost")
+ .setHandler(se -> {
+ exchange.set(se);
+ latch.countDown();
+ })
+ .build();
+
+ RouteBuilder.addRoutes(context, b -> {
+ b.from("direct:start")
+
.setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http()).constant("fromCEHeader")
+
.setHeader(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).constant("fromCamelHeader")
+ .to("knative:endpoint/ep");
+ });
+
+ context.start();
+ try {
+ server.start();
+ template.sendBody("direct:start", "");
+
+ latch.await();
+
+ HeaderMap headers = exchange.get().getRequestHeaders();
+
+
assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http())).isEqualTo(ce.version());
+
assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())).isEqualTo("fromCEHeader");
+
assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http())).isNotNull();
+
assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http())).isEqualTo("knative://endpoint/ep");
+
assertThat(headers.getFirst(Exchange.CONTENT_TYPE)).isEqualTo("text/plain");
+ } finally {
+ server.stop();
+ }
+ }
}
diff --git
a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/AbstractCloudEventProcessor.java
b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/AbstractCloudEventProcessor.java
index 4cd638a..9a4e550 100644
---
a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/AbstractCloudEventProcessor.java
+++
b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/AbstractCloudEventProcessor.java
@@ -22,6 +22,7 @@ import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Map;
import java.util.Objects;
+import java.util.function.Supplier;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
@@ -55,7 +56,7 @@ abstract class AbstractCloudEventProcessor implements
CloudEventProcessor {
final Map<String, Object> headers =
exchange.getIn().getHeaders();
for (CloudEvent.Attribute attribute: ce.attributes()) {
- Object val = headers.remove(attribute.http());
+ Object val = headers.get(attribute.http());
if (val != null) {
headers.put(attribute.id(), val);
}
@@ -75,23 +76,31 @@ abstract class AbstractCloudEventProcessor implements
CloudEventProcessor {
final CloudEvent ce = cloudEvent();
return exchange -> {
- String eventType =
service.getMetadata().get(Knative.KNATIVE_EVENT_TYPE);
- if (eventType == null) {
- eventType = endpoint.getConfiguration().getCloudEventsType();
+ final String contentType =
service.getMetadata().get(Knative.CONTENT_TYPE);
+ final Map<String, Object> headers =
exchange.getMessage().getHeaders();
+
+ for (CloudEvent.Attribute attribute: ce.attributes()) {
+ Object value = headers.get(attribute.id());
+ if (value != null) {
+ headers.putIfAbsent(attribute.http(), value);
+ }
}
- final String contentType =
service.getMetadata().get(Knative.CONTENT_TYPE);
- final ZonedDateTime created =
exchange.getCreated().toInstant().atZone(ZoneId.systemDefault());
- final String eventTime =
DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(created);
- final Map<String, Object> headers = exchange.getIn().getHeaders();
-
-
headers.putIfAbsent(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(),
exchange.getExchangeId());
-
headers.putIfAbsent(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(),
endpoint.getEndpointUri());
-
headers.putIfAbsent(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(),
ce.version());
-
headers.putIfAbsent(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(),
eventType);
-
headers.putIfAbsent(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(),
eventTime);
headers.putIfAbsent(Exchange.CONTENT_TYPE, contentType);
+ setCloudEventHeader(headers, CloudEvent.CAMEL_CLOUD_EVENT_ID,
exchange::getExchangeId);
+ setCloudEventHeader(headers, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE,
endpoint::getEndpointUri);
+ setCloudEventHeader(headers, CloudEvent.CAMEL_CLOUD_EVENT_VERSION,
ce::version);
+ setCloudEventHeader(headers, CloudEvent.CAMEL_CLOUD_EVENT_TYPE, ()
-> {
+ return
service.getMetadata().getOrDefault(Knative.KNATIVE_EVENT_TYPE,
endpoint.getConfiguration().getCloudEventsType());
+ });
+ setCloudEventHeader(headers, CloudEvent.CAMEL_CLOUD_EVENT_TIME, ()
-> {
+ final ZonedDateTime created =
exchange.getCreated().toInstant().atZone(ZoneId.systemDefault());
+ final String eventTime =
DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(created);
+
+ return eventTime;
+ });
+
for (Map.Entry<String, String> entry:
service.getMetadata().entrySet()) {
if
(entry.getKey().startsWith(Knative.KNATIVE_CE_OVERRIDE_PREFIX)) {
final String key =
entry.getKey().substring(Knative.KNATIVE_CE_OVERRIDE_PREFIX.length());
@@ -102,4 +111,8 @@ abstract class AbstractCloudEventProcessor implements
CloudEventProcessor {
}
};
}
+
+ protected void setCloudEventHeader(Map<String, Object> headers, String id,
Supplier<Object> supplier) {
+ headers.putIfAbsent(cloudEvent().mandatoryAttribute(id).http(),
supplier.get());
+ }
}