This is an automated email from the ASF dual-hosted git repository.

nferraro pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k-runtime.git


The following commit(s) were added to refs/heads/master by this push:
     new 177022c   CamelCloudEventXXX not converted to CE header #177
     new f4c290e  Merge pull request #178 from lburgazzoli/github-177
177022c is described below

commit 177022c3f2935f80d3d2ab59bdcf50407dc7faef
Author: lburgazzoli <[email protected]>
AuthorDate: Sun Oct 27 17:55:01 2019 +0100

     CamelCloudEventXXX not converted to CE header #177
---
 .../camel/component/knative/spi/CloudEvents.java   |   2 +-
 .../component/knative/http/KnativeHttpTest.java    | 120 ++++++++++++++++++++-
 .../knative/ce/AbstractCloudEventProcessor.java    |  41 ++++---
 3 files changed, 143 insertions(+), 20 deletions(-)

diff --git 
a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/CloudEvents.java
 
b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/CloudEvents.java
index dd8ab96..6118ef0 100644
--- 
a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/CloudEvents.java
+++ 
b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/CloudEvents.java
@@ -34,7 +34,7 @@ public enum CloudEvents implements CloudEvent {
             Attribute.simple(CloudEvent.CAMEL_CLOUD_EVENT_ID, "CE-EventID", 
"eventID"),
             Attribute.simple(CloudEvent.CAMEL_CLOUD_EVENT_TIME, 
"CE-EventTime", "eventTime"),
             Attribute.simple(CloudEvent.CAMEL_CLOUD_EVENT_SCHEMA_URL, 
"CE-SchemaURL", "schemaURL"),
-            Attribute.simple(CloudEvent.CAMEL_CLOUD_EVENT_CONTENT_TYPE, 
"ContentType", "contentType"),
+            Attribute.simple(CloudEvent.CAMEL_CLOUD_EVENT_CONTENT_TYPE, 
"Content-Type", "contentType"),
             Attribute.simple(CloudEvent.CAMEL_CLOUD_EVENT_EXTENSIONS, 
"CE-Extensions", "extensions")
         )
     )),
diff --git 
a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
 
b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
index 9628838..390efd4 100644
--- 
a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
+++ 
b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
@@ -262,7 +262,6 @@ public class KnativeHttpTest {
             public void configure() throws Exception {
                 from("knative:endpoint/myEndpoint")
                     .to("mock:ce");
-
                 from("direct:source")
                     .toF("undertow:http://localhost:%d/a/path";, port);
             }
@@ -272,11 +271,16 @@ public class KnativeHttpTest {
 
         MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
         
mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).id(),
 ce.version());
+        
mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(),
 ce.version());
         
mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).id(),
 "org.apache.camel.event");
+        
mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(),
 "org.apache.camel.event");
         
mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).id(),
 "myEventID");
+        
mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(),
 "myEventID");
         
mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).id(),
 "/somewhere");
+        
mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(),
 "/somewhere");
         mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
         mock.expectedMessagesMatches(e -> 
e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).id()));
+        mock.expectedMessagesMatches(e -> 
e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http()));
         mock.expectedBodiesReceived("test");
         mock.expectedMessageCount(1);
 
@@ -918,8 +922,7 @@ public class KnativeHttpTest {
             public void configure() throws Exception {
                 from("direct:source")
                     .to("knative:event/myEvent?kind=MyObject&apiVersion=v1");
-
-                fromF("knative:event/myEvent?kind=MyOtherObject&apiVersion=v2")
+                from("knative:event/myEvent?kind=MyOtherObject&apiVersion=v2")
                     .to("mock:ce");
             }
         });
@@ -1370,8 +1373,6 @@ public class KnativeHttpTest {
         }
     }
 
-
-
     @ParameterizedTest
     @MethodSource("provideCloudEventsImplementations")
     void testHeadersOverrideFromConf(CloudEvent ce) throws Exception {
@@ -1434,5 +1435,114 @@ public class KnativeHttpTest {
             server.stop();
         }
     }
+
+    @ParameterizedTest
+    @MethodSource("provideCloudEventsImplementations")
+    void testHeadersOverrideFromRouteWithCamelHeader(CloudEvent ce) throws 
Exception {
+        configureKnativeComponent(
+            context,
+            ce,
+            endpoint(
+                Knative.EndpointKind.sink,
+                "ep",
+                "localhost",
+                port,
+                KnativeSupport.mapOf(
+                    Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+                    Knative.CONTENT_TYPE, "text/plain"
+                )
+            )
+        );
+
+        CountDownLatch latch = new CountDownLatch(1);
+        AtomicReference<HttpServerExchange> exchange = new AtomicReference<>();
+
+        Undertow server = Undertow.builder()
+            .addHttpListener(port, "localhost")
+            .setHandler(se -> {
+                exchange.set(se);
+                latch.countDown();
+            })
+            .build();
+
+        RouteBuilder.addRoutes(context, b -> {
+            b.from("direct:start")
+                
.setHeader(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).constant("myType")
+                .to("knative:endpoint/ep");
+        });
+
+        context.start();
+        try {
+            server.start();
+            template.sendBody("direct:start", "");
+
+            latch.await();
+
+            HeaderMap headers = exchange.get().getRequestHeaders();
+
+            
assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http())).isEqualTo(ce.version());
+            
assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())).isEqualTo("myType");
+            
assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http())).isNotNull();
+            
assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http())).isEqualTo("knative://endpoint/ep");
+            
assertThat(headers.getFirst(Exchange.CONTENT_TYPE)).isEqualTo("text/plain");
+        }  finally {
+            server.stop();
+        }
+    }
+
+    @ParameterizedTest
+    @MethodSource("provideCloudEventsImplementations")
+    void testHeadersOverrideFromRouteWithCEHeader(CloudEvent ce) throws 
Exception {
+        configureKnativeComponent(
+            context,
+            ce,
+            endpoint(
+                Knative.EndpointKind.sink,
+                "ep",
+                "localhost",
+                port,
+                KnativeSupport.mapOf(
+                    Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+                    Knative.CONTENT_TYPE, "text/plain"
+                )
+            )
+        );
+
+        CountDownLatch latch = new CountDownLatch(1);
+        AtomicReference<HttpServerExchange> exchange = new AtomicReference<>();
+
+        Undertow server = Undertow.builder()
+            .addHttpListener(port, "localhost")
+            .setHandler(se -> {
+                exchange.set(se);
+                latch.countDown();
+            })
+            .build();
+
+        RouteBuilder.addRoutes(context, b -> {
+            b.from("direct:start")
+                
.setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http()).constant("fromCEHeader")
+                
.setHeader(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).constant("fromCamelHeader")
+                .to("knative:endpoint/ep");
+        });
+
+        context.start();
+        try {
+            server.start();
+            template.sendBody("direct:start", "");
+
+            latch.await();
+
+            HeaderMap headers = exchange.get().getRequestHeaders();
+
+            
assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http())).isEqualTo(ce.version());
+            
assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())).isEqualTo("fromCEHeader");
+            
assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http())).isNotNull();
+            
assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http())).isEqualTo("knative://endpoint/ep");
+            
assertThat(headers.getFirst(Exchange.CONTENT_TYPE)).isEqualTo("text/plain");
+        }  finally {
+            server.stop();
+        }
+    }
 }
 
diff --git 
a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/AbstractCloudEventProcessor.java
 
b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/AbstractCloudEventProcessor.java
index 4cd638a..9a4e550 100644
--- 
a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/AbstractCloudEventProcessor.java
+++ 
b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/AbstractCloudEventProcessor.java
@@ -22,6 +22,7 @@ import java.time.ZonedDateTime;
 import java.time.format.DateTimeFormatter;
 import java.util.Map;
 import java.util.Objects;
+import java.util.function.Supplier;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
@@ -55,7 +56,7 @@ abstract class AbstractCloudEventProcessor implements 
CloudEventProcessor {
                 final Map<String, Object> headers = 
exchange.getIn().getHeaders();
 
                 for (CloudEvent.Attribute attribute: ce.attributes()) {
-                    Object val = headers.remove(attribute.http());
+                    Object val = headers.get(attribute.http());
                     if (val != null) {
                         headers.put(attribute.id(), val);
                     }
@@ -75,23 +76,31 @@ abstract class AbstractCloudEventProcessor implements 
CloudEventProcessor {
         final CloudEvent ce = cloudEvent();
 
         return exchange -> {
-            String eventType = 
service.getMetadata().get(Knative.KNATIVE_EVENT_TYPE);
-            if (eventType == null) {
-                eventType = endpoint.getConfiguration().getCloudEventsType();
+            final String contentType = 
service.getMetadata().get(Knative.CONTENT_TYPE);
+            final Map<String, Object> headers = 
exchange.getMessage().getHeaders();
+
+            for (CloudEvent.Attribute attribute: ce.attributes()) {
+                Object value = headers.get(attribute.id());
+                if (value != null) {
+                    headers.putIfAbsent(attribute.http(), value);
+                }
             }
 
-            final String contentType = 
service.getMetadata().get(Knative.CONTENT_TYPE);
-            final ZonedDateTime created = 
exchange.getCreated().toInstant().atZone(ZoneId.systemDefault());
-            final String eventTime = 
DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(created);
-            final Map<String, Object> headers = exchange.getIn().getHeaders();
-
-            
headers.putIfAbsent(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(),
 exchange.getExchangeId());
-            
headers.putIfAbsent(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(),
 endpoint.getEndpointUri());
-            
headers.putIfAbsent(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(),
 ce.version());
-            
headers.putIfAbsent(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(),
 eventType);
-            
headers.putIfAbsent(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(),
 eventTime);
             headers.putIfAbsent(Exchange.CONTENT_TYPE, contentType);
 
+            setCloudEventHeader(headers, CloudEvent.CAMEL_CLOUD_EVENT_ID, 
exchange::getExchangeId);
+            setCloudEventHeader(headers, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE, 
endpoint::getEndpointUri);
+            setCloudEventHeader(headers, CloudEvent.CAMEL_CLOUD_EVENT_VERSION, 
ce::version);
+            setCloudEventHeader(headers, CloudEvent.CAMEL_CLOUD_EVENT_TYPE, () 
-> {
+                return 
service.getMetadata().getOrDefault(Knative.KNATIVE_EVENT_TYPE, 
endpoint.getConfiguration().getCloudEventsType());
+            });
+            setCloudEventHeader(headers, CloudEvent.CAMEL_CLOUD_EVENT_TIME, () 
-> {
+                final ZonedDateTime created = 
exchange.getCreated().toInstant().atZone(ZoneId.systemDefault());
+                final String eventTime = 
DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(created);
+
+                return eventTime;
+            });
+
             for (Map.Entry<String, String> entry: 
service.getMetadata().entrySet()) {
                 if 
(entry.getKey().startsWith(Knative.KNATIVE_CE_OVERRIDE_PREFIX)) {
                     final String key = 
entry.getKey().substring(Knative.KNATIVE_CE_OVERRIDE_PREFIX.length());
@@ -102,4 +111,8 @@ abstract class AbstractCloudEventProcessor implements 
CloudEventProcessor {
             }
         };
     }
+
+    protected void setCloudEventHeader(Map<String, Object> headers, String id, 
Supplier<Object> supplier) {
+        headers.putIfAbsent(cloudEvent().mandatoryAttribute(id).http(), 
supplier.get());
+    }
 }

Reply via email to