This is an automated email from the ASF dual-hosted git repository.

lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k-runtime.git

commit ff5069c0de9adaa6d123c1a81666ffcefe59d4dc
Author: lburgazzoli <[email protected]>
AuthorDate: Sun Jun 21 11:55:05 2020 +0200

    camel-knative: add support for URL in knative environment #369
---
 .../camel/component/knative/spi/Knative.java       |   1 +
 .../component/knative/spi/KnativeEnvironment.java  | 211 ++++++++++++---------
 .../knative/http/KnativeHttpProducer.java          |  31 +--
 .../component/knative/http/KnativeHttpTest.java    |  43 +++++
 4 files changed, 181 insertions(+), 105 deletions(-)

diff --git 
a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/Knative.java
 
b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/Knative.java
index 4d52932..a9dbbc0 100644
--- 
a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/Knative.java
+++ 
b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/Knative.java
@@ -37,6 +37,7 @@ public final class Knative {
     public static final String SERVICE_META_HOST = "service.host";
     public static final String SERVICE_META_ZONE = "service.zone";
     public static final String SERVICE_META_PATH = "service.path";
+    public static final String SERVICE_META_URL = "service.url";
 
     private Knative() {
     }
diff --git 
a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeEnvironment.java
 
b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeEnvironment.java
index 76af6de..2884dc9 100644
--- 
a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeEnvironment.java
+++ 
b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeEnvironment.java
@@ -21,7 +21,7 @@ import java.io.Reader;
 import java.io.StringReader;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -33,8 +33,6 @@ import org.apache.camel.CamelContext;
 import org.apache.camel.impl.cloud.DefaultServiceDefinition;
 import org.apache.camel.support.ResourceHelper;
 
-import static org.apache.camel.util.CollectionHelper.mapOf;
-
 /*
  * Assuming it is loaded from a json for now
  */
@@ -70,7 +68,6 @@ public class KnativeEnvironment {
 
     public static KnativeEnvironment mandatoryLoadFromResource(CamelContext 
context, String path) throws Exception {
         try (InputStream is = 
ResourceHelper.resolveMandatoryResourceAsInputStream(context, path)) {
-
             //
             // read the knative environment from a file formatted as json, 
i.e. :
             //
@@ -100,129 +97,151 @@ public class KnativeEnvironment {
     }
 
     public static KnativeServiceDefinition endpoint(Knative.EndpointKind 
endpointKind, String name, String host, int port) {
-        return entry(
-            endpointKind,
-            Knative.Type.endpoint,
-            name,
-            host,
-            port,
-            Collections.emptyMap()
-        );
+        return serviceBuilder(Knative.Type.endpoint, name)
+            .withHost(host)
+            .withPort(port)
+            .withMeta(Knative.CAMEL_ENDPOINT_KIND, endpointKind)
+            .build();
     }
 
     public static KnativeServiceDefinition endpoint(Knative.EndpointKind 
endpointKind, String name, String host, int port, Map<String, String> metadata) 
{
-        return entry(
-            endpointKind,
-            Knative.Type.endpoint,
-            name,
-            host,
-            port,
-            metadata
-        );
+        return serviceBuilder(Knative.Type.endpoint, name)
+            .withHost(host)
+            .withPort(port)
+            .withMeta(metadata)
+            .withMeta(Knative.CAMEL_ENDPOINT_KIND, endpointKind)
+            .build();
     }
 
     public static KnativeServiceDefinition sourceEndpoint(String name, 
Map<String, String> metadata) {
-        return entry(
-            Knative.EndpointKind.source,
-            Knative.Type.endpoint,
-            name,
-            null,
-            -1,
-            metadata
-        );
+        return serviceBuilder(Knative.Type.endpoint, name)
+            .withMeta(metadata)
+            .withMeta(Knative.CAMEL_ENDPOINT_KIND, 
Knative.EndpointKind.source.name())
+            .build();
     }
 
     public static KnativeServiceDefinition channel(Knative.EndpointKind 
endpointKind, String name, String host, int port) {
-        return entry(
-            endpointKind,
-            Knative.Type.channel,
-            name,
-            host,
-            port,
-            Collections.emptyMap()
-        );
+        return serviceBuilder(Knative.Type.channel, name)
+            .withHost(host)
+            .withPort(port)
+            .withMeta(Knative.CAMEL_ENDPOINT_KIND, endpointKind)
+            .build();
     }
 
     public static KnativeServiceDefinition channel(Knative.EndpointKind 
endpointKind, String name, String host, int port, Map<String, String> metadata) 
{
-        return entry(
-            endpointKind,
-            Knative.Type.channel,
-            name,
-            host,
-            port,
-            metadata
-        );
+        return serviceBuilder(Knative.Type.channel, name)
+            .withHost(host)
+            .withPort(port)
+            .withMeta(metadata)
+            .withMeta(Knative.CAMEL_ENDPOINT_KIND, endpointKind)
+            .build();
     }
 
     public static KnativeServiceDefinition event(Knative.EndpointKind 
endpointKind, String name, String host, int port) {
-        return entry(
-            endpointKind,
-            Knative.Type.event,
-            name,
-            host,
-            port,
-            Collections.emptyMap()
-        );
+        return serviceBuilder(Knative.Type.event, name)
+            .withHost(host)
+            .withPort(port)
+            .withMeta(Knative.CAMEL_ENDPOINT_KIND, endpointKind)
+            .build();
     }
 
     public static KnativeServiceDefinition sourceEvent(String name) {
-        return entry(
-            Knative.EndpointKind.source,
-            Knative.Type.event,
-            name,
-            null,
-            -1,
-            Collections.emptyMap()
-        );
+        return serviceBuilder(Knative.Type.event, name)
+            .withMeta(Knative.CAMEL_ENDPOINT_KIND, Knative.EndpointKind.source)
+            .build();
     }
 
     public static KnativeServiceDefinition sourceEvent(String name, 
Map<String, String> metadata) {
-        return entry(
-            Knative.EndpointKind.source,
-            Knative.Type.event,
-            name,
-            null,
-            -1,
-            metadata
-        );
+        return serviceBuilder(Knative.Type.event, name)
+            .withMeta(metadata)
+            .withMeta(Knative.CAMEL_ENDPOINT_KIND, Knative.EndpointKind.source)
+            .build();
     }
 
     public static KnativeServiceDefinition event(Knative.EndpointKind 
endpointKind, String name, String host, int port, Map<String, String> metadata) 
{
-        return entry(
-            endpointKind,
-            Knative.Type.event,
-            name,
-            host,
-            port,
-            metadata
-        );
-    }
-
-    public static KnativeServiceDefinition entry(Knative.EndpointKind 
endpointKind, Knative.Type type, String name, String host, int port, 
Map<String, String> metadata) {
-        return new KnativeEnvironment.KnativeServiceDefinition(
-            type,
-            name,
-            host,
-            port,
-            KnativeSupport.mergeMaps(
-                metadata,
-                mapOf(
-                    Knative.CAMEL_ENDPOINT_KIND, endpointKind.name()
-                )
-            )
-        );
+        return serviceBuilder(Knative.Type.event, name)
+            .withHost(host)
+            .withPort(port)
+            .withMeta(metadata)
+            .withMeta(Knative.CAMEL_ENDPOINT_KIND, endpointKind)
+            .build();
     }
 
     public static KnativeEnvironment on(KnativeServiceDefinition... 
definitions) {
         return new KnativeEnvironment(Arrays.asList(definitions));
     }
 
+    public static KnativeServiceBuilder serviceBuilder(Knative.Type type, 
String name) {
+        return new KnativeServiceBuilder(type, name);
+    }
+
     // ************************
     //
     // Types
     //
     // ************************
 
+
+    public static final class KnativeServiceBuilder {
+        private final Knative.Type type;
+        private final String name;
+        private String host;
+        private Integer port;
+        private Map<String, String> metadata;
+
+        public KnativeServiceBuilder(Knative.Type type, String name) {
+            this.type = type;
+            this.name = name;
+        }
+
+        public KnativeServiceBuilder withHost(String host) {
+            this.host = host;
+            return this;
+        }
+
+        public KnativeServiceBuilder withPort(Integer port) {
+            this.port = port;
+            return this;
+        }
+
+        public KnativeServiceBuilder withMeta(Map<String, String> metadata) {
+            if (metadata == null) {
+                return this;
+            }
+            if (this.metadata == null) {
+                this.metadata = new HashMap<>();
+            }
+            this.metadata.putAll(metadata);
+            return this;
+        }
+
+        public KnativeServiceBuilder withMeta(String key, String value) {
+            if (key == null || value == null) {
+                return this;
+            }
+            if (this.metadata == null) {
+                this.metadata = new HashMap<>();
+            }
+            this.metadata.put(key, value);
+            return this;
+        }
+
+        public KnativeServiceBuilder withMeta(String key, Enum<?> e) {
+            if (key == null || e == null) {
+                return this;
+            }
+            if (this.metadata == null) {
+                this.metadata = new HashMap<>();
+            }
+            this.metadata.put(key, e.name());
+            return this;
+        }
+
+        public KnativeServiceDefinition build() {
+            return new KnativeServiceDefinition(type, name, host, port, 
metadata);
+        }
+    }
+
     public static final class KnativeServiceDefinition extends 
DefaultServiceDefinition {
         @JsonCreator
         public KnativeServiceDefinition(
@@ -238,7 +257,7 @@ public class KnativeEnvironment {
                 port == null ? -1 : port,
                 KnativeSupport.mergeMaps(
                     metadata,
-                    mapOf(
+                    Map.of(
                         Knative.KNATIVE_TYPE, type.name())
                 )
             );
@@ -249,7 +268,7 @@ public class KnativeEnvironment {
         }
 
         public String getPath() {
-            return getMetadata().get(Knative.SERVICE_META_PATH);
+            return getMetadata(Knative.SERVICE_META_PATH);
         }
 
         public String getPathOrDefault(String path) {
@@ -257,13 +276,17 @@ public class KnativeEnvironment {
         }
 
         public String getEventType() {
-            return getMetadata().get(Knative.KNATIVE_EVENT_TYPE);
+            return getMetadata(Knative.KNATIVE_EVENT_TYPE);
         }
 
         public int getPortOrDefault(int port) {
             return getPort() != -1 ? getPort() : port;
         }
 
+        public String getUrl() {
+            return getMetadata(Knative.SERVICE_META_URL);
+        }
+
         public String getMetadata(String key) {
             return getMetadata().get(key);
         }
diff --git 
a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpProducer.java
 
b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpProducer.java
index d707a51..010bfb3 100644
--- 
a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpProducer.java
+++ 
b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpProducer.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.knative.http;
 
 import java.util.Map;
+import java.util.function.Supplier;
 
 import io.vertx.core.MultiMap;
 import io.vertx.core.Vertx;
@@ -38,6 +39,7 @@ import org.apache.camel.support.DefaultMessage;
 import org.apache.camel.support.MessageHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.URISupport;
+import org.apache.camel.util.function.Suppliers;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,6 +51,7 @@ public class KnativeHttpProducer extends DefaultAsyncProducer 
{
     private final Vertx vertx;
     private final WebClientOptions clientOptions;
     private final HeaderFilterStrategy headerFilterStrategy;
+    private final Supplier<String> uri;
 
     private WebClient client;
 
@@ -65,6 +68,7 @@ public class KnativeHttpProducer extends DefaultAsyncProducer 
{
         this.vertx = ObjectHelper.notNull(vertx, "vertx");
         this.clientOptions = ObjectHelper.supplyIfEmpty(clientOptions, 
WebClientOptions::new);
         this.headerFilterStrategy = new KnativeHttpHeaderFilterStrategy();
+        this.uri = Suppliers.memorize(() -> computeUrl(serviceDefinition));
     }
 
     @Override
@@ -111,10 +115,7 @@ public class KnativeHttpProducer extends 
DefaultAsyncProducer {
             return true;
         }
 
-        final int port = 
serviceDefinition.getPortOrDefault(KnativeHttpTransport.DEFAULT_PORT);
-        final String path = 
serviceDefinition.getPathOrDefault(KnativeHttpTransport.DEFAULT_PATH);
-
-        client.post(port, serviceDefinition.getHost(), path)
+        client.postAbs(this.uri.get())
             .putHeaders(headers)
             .sendBuffer(Buffer.buffer(payload), response -> {
                 if (response.succeeded()) {
@@ -136,7 +137,7 @@ public class KnativeHttpProducer extends 
DefaultAsyncProducer {
                     if (result.statusCode() < 200 || result.statusCode() >= 
300) {
                         String exceptionMessage = String.format(
                             "HTTP operation failed invoking %s with 
statusCode: %d, statusMessage: %s",
-                            URISupport.sanitizeUri(getURI()),
+                            URISupport.sanitizeUri(this.uri.get()),
                             result.statusCode(),
                             result.statusMessage()
                         );
@@ -148,7 +149,7 @@ public class KnativeHttpProducer extends 
DefaultAsyncProducer {
 
                     exchange.setMessage(answer);
                 } else if (response.failed()) {
-                    String exceptionMessage = "HTTP operation failed invoking 
" + URISupport.sanitizeUri(getURI());
+                    String exceptionMessage = "HTTP operation failed invoking 
" + URISupport.sanitizeUri(this.uri.get());
                     if (response.result() != null) {
                         exceptionMessage += " with statusCode: " + 
response.result().statusCode();
                     }
@@ -180,12 +181,20 @@ public class KnativeHttpProducer extends 
DefaultAsyncProducer {
         }
     }
 
-    private String getURI() {
-        String p = ObjectHelper.supplyIfEmpty(serviceDefinition.getPath(), () 
-> KnativeHttpTransport.DEFAULT_PATH);
-        if (!p.startsWith("/")) {
-            p = "/" + p;
+    private static String 
computeUrl(KnativeEnvironment.KnativeServiceDefinition definition) {
+        String url = definition.getUrl();
+        if (url == null) {
+            int port = 
definition.getPortOrDefault(KnativeHttpTransport.DEFAULT_PORT);
+            String path = 
definition.getPathOrDefault(KnativeHttpTransport.DEFAULT_PATH);
+
+            if (!path.startsWith("/")) {
+                path = "/" + path;
+            }
+
+            url = String.format("http://%s:%d%s";, definition.getHost(), port, 
path);
         }
 
-        return String.format("http://%s:%d%s";, serviceDefinition.getHost(), 
serviceDefinition.getPort(), p);
+        return url;
     }
+
 }
diff --git 
a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
 
b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
index edb81cc..a896c1c 100644
--- 
a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
+++ 
b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
@@ -234,6 +234,49 @@ public class KnativeHttpTest {
 
     @ParameterizedTest
     @EnumSource(CloudEvents.class)
+    void testInvokeEndpointByUrl(CloudEvent ce) throws Exception {
+        configureKnativeComponent(
+            context,
+            ce,
+            endpoint(
+                Knative.EndpointKind.sink,
+                "myEndpoint",
+                "none",
+                -1,
+                mapOf(
+                    Knative.SERVICE_META_PATH, "/does/not/exist",
+                    Knative.SERVICE_META_URL, 
String.format("http://localhost:%d/a/path";, platformHttpPort),
+                    Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+                    Knative.CONTENT_TYPE, "text/plain"
+                ))
+        );
+
+        RouteBuilder.addRoutes(context, b -> {
+            b.from("direct:source")
+                .to("knative:endpoint/myEndpoint");
+            b.from("platform-http:/a/path")
+                .to("mock:ce");
+        });
+
+        context.start();
+
+        MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
+        
mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(),
 ce.version());
+        
mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(),
 "org.apache.camel.event");
+        
mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(),
 "knative://endpoint/myEndpoint");
+        mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
+        mock.expectedMessagesMatches(e -> 
e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http()));
+        mock.expectedMessagesMatches(e -> 
e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http()));
+        mock.expectedBodiesReceived("test");
+        mock.expectedMessageCount(1);
+
+        template.sendBody("direct:source", "test");
+
+        mock.assertIsSatisfied();
+    }
+
+    @ParameterizedTest
+    @EnumSource(CloudEvents.class)
     void testConsumeStructuredContent(CloudEvent ce) throws Exception {
 
         configureKnativeComponent(

Reply via email to