This is an automated email from the ASF dual-hosted git repository. lburgazzoli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-k-runtime.git
commit ff5069c0de9adaa6d123c1a81666ffcefe59d4dc Author: lburgazzoli <[email protected]> AuthorDate: Sun Jun 21 11:55:05 2020 +0200 camel-knative: add support for URL in knative environment #369 --- .../camel/component/knative/spi/Knative.java | 1 + .../component/knative/spi/KnativeEnvironment.java | 211 ++++++++++++--------- .../knative/http/KnativeHttpProducer.java | 31 +-- .../component/knative/http/KnativeHttpTest.java | 43 +++++ 4 files changed, 181 insertions(+), 105 deletions(-) diff --git a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/Knative.java b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/Knative.java index 4d52932..a9dbbc0 100644 --- a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/Knative.java +++ b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/Knative.java @@ -37,6 +37,7 @@ public final class Knative { public static final String SERVICE_META_HOST = "service.host"; public static final String SERVICE_META_ZONE = "service.zone"; public static final String SERVICE_META_PATH = "service.path"; + public static final String SERVICE_META_URL = "service.url"; private Knative() { } diff --git a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeEnvironment.java b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeEnvironment.java index 76af6de..2884dc9 100644 --- a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeEnvironment.java +++ b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeEnvironment.java @@ -21,7 +21,7 @@ import java.io.Reader; import java.io.StringReader; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -33,8 +33,6 @@ import org.apache.camel.CamelContext; import org.apache.camel.impl.cloud.DefaultServiceDefinition; import org.apache.camel.support.ResourceHelper; -import static org.apache.camel.util.CollectionHelper.mapOf; - /* * Assuming it is loaded from a json for now */ @@ -70,7 +68,6 @@ public class KnativeEnvironment { public static KnativeEnvironment mandatoryLoadFromResource(CamelContext context, String path) throws Exception { try (InputStream is = ResourceHelper.resolveMandatoryResourceAsInputStream(context, path)) { - // // read the knative environment from a file formatted as json, i.e. : // @@ -100,129 +97,151 @@ public class KnativeEnvironment { } public static KnativeServiceDefinition endpoint(Knative.EndpointKind endpointKind, String name, String host, int port) { - return entry( - endpointKind, - Knative.Type.endpoint, - name, - host, - port, - Collections.emptyMap() - ); + return serviceBuilder(Knative.Type.endpoint, name) + .withHost(host) + .withPort(port) + .withMeta(Knative.CAMEL_ENDPOINT_KIND, endpointKind) + .build(); } public static KnativeServiceDefinition endpoint(Knative.EndpointKind endpointKind, String name, String host, int port, Map<String, String> metadata) { - return entry( - endpointKind, - Knative.Type.endpoint, - name, - host, - port, - metadata - ); + return serviceBuilder(Knative.Type.endpoint, name) + .withHost(host) + .withPort(port) + .withMeta(metadata) + .withMeta(Knative.CAMEL_ENDPOINT_KIND, endpointKind) + .build(); } public static KnativeServiceDefinition sourceEndpoint(String name, Map<String, String> metadata) { - return entry( - Knative.EndpointKind.source, - Knative.Type.endpoint, - name, - null, - -1, - metadata - ); + return serviceBuilder(Knative.Type.endpoint, name) + .withMeta(metadata) + .withMeta(Knative.CAMEL_ENDPOINT_KIND, Knative.EndpointKind.source.name()) + .build(); } public static KnativeServiceDefinition channel(Knative.EndpointKind endpointKind, String name, String host, int port) { - return entry( - endpointKind, - Knative.Type.channel, - name, - host, - port, - Collections.emptyMap() - ); + return serviceBuilder(Knative.Type.channel, name) + .withHost(host) + .withPort(port) + .withMeta(Knative.CAMEL_ENDPOINT_KIND, endpointKind) + .build(); } public static KnativeServiceDefinition channel(Knative.EndpointKind endpointKind, String name, String host, int port, Map<String, String> metadata) { - return entry( - endpointKind, - Knative.Type.channel, - name, - host, - port, - metadata - ); + return serviceBuilder(Knative.Type.channel, name) + .withHost(host) + .withPort(port) + .withMeta(metadata) + .withMeta(Knative.CAMEL_ENDPOINT_KIND, endpointKind) + .build(); } public static KnativeServiceDefinition event(Knative.EndpointKind endpointKind, String name, String host, int port) { - return entry( - endpointKind, - Knative.Type.event, - name, - host, - port, - Collections.emptyMap() - ); + return serviceBuilder(Knative.Type.event, name) + .withHost(host) + .withPort(port) + .withMeta(Knative.CAMEL_ENDPOINT_KIND, endpointKind) + .build(); } public static KnativeServiceDefinition sourceEvent(String name) { - return entry( - Knative.EndpointKind.source, - Knative.Type.event, - name, - null, - -1, - Collections.emptyMap() - ); + return serviceBuilder(Knative.Type.event, name) + .withMeta(Knative.CAMEL_ENDPOINT_KIND, Knative.EndpointKind.source) + .build(); } public static KnativeServiceDefinition sourceEvent(String name, Map<String, String> metadata) { - return entry( - Knative.EndpointKind.source, - Knative.Type.event, - name, - null, - -1, - metadata - ); + return serviceBuilder(Knative.Type.event, name) + .withMeta(metadata) + .withMeta(Knative.CAMEL_ENDPOINT_KIND, Knative.EndpointKind.source) + .build(); } public static KnativeServiceDefinition event(Knative.EndpointKind endpointKind, String name, String host, int port, Map<String, String> metadata) { - return entry( - endpointKind, - Knative.Type.event, - name, - host, - port, - metadata - ); - } - - public static KnativeServiceDefinition entry(Knative.EndpointKind endpointKind, Knative.Type type, String name, String host, int port, Map<String, String> metadata) { - return new KnativeEnvironment.KnativeServiceDefinition( - type, - name, - host, - port, - KnativeSupport.mergeMaps( - metadata, - mapOf( - Knative.CAMEL_ENDPOINT_KIND, endpointKind.name() - ) - ) - ); + return serviceBuilder(Knative.Type.event, name) + .withHost(host) + .withPort(port) + .withMeta(metadata) + .withMeta(Knative.CAMEL_ENDPOINT_KIND, endpointKind) + .build(); } public static KnativeEnvironment on(KnativeServiceDefinition... definitions) { return new KnativeEnvironment(Arrays.asList(definitions)); } + public static KnativeServiceBuilder serviceBuilder(Knative.Type type, String name) { + return new KnativeServiceBuilder(type, name); + } + // ************************ // // Types // // ************************ + + public static final class KnativeServiceBuilder { + private final Knative.Type type; + private final String name; + private String host; + private Integer port; + private Map<String, String> metadata; + + public KnativeServiceBuilder(Knative.Type type, String name) { + this.type = type; + this.name = name; + } + + public KnativeServiceBuilder withHost(String host) { + this.host = host; + return this; + } + + public KnativeServiceBuilder withPort(Integer port) { + this.port = port; + return this; + } + + public KnativeServiceBuilder withMeta(Map<String, String> metadata) { + if (metadata == null) { + return this; + } + if (this.metadata == null) { + this.metadata = new HashMap<>(); + } + this.metadata.putAll(metadata); + return this; + } + + public KnativeServiceBuilder withMeta(String key, String value) { + if (key == null || value == null) { + return this; + } + if (this.metadata == null) { + this.metadata = new HashMap<>(); + } + this.metadata.put(key, value); + return this; + } + + public KnativeServiceBuilder withMeta(String key, Enum<?> e) { + if (key == null || e == null) { + return this; + } + if (this.metadata == null) { + this.metadata = new HashMap<>(); + } + this.metadata.put(key, e.name()); + return this; + } + + public KnativeServiceDefinition build() { + return new KnativeServiceDefinition(type, name, host, port, metadata); + } + } + public static final class KnativeServiceDefinition extends DefaultServiceDefinition { @JsonCreator public KnativeServiceDefinition( @@ -238,7 +257,7 @@ public class KnativeEnvironment { port == null ? -1 : port, KnativeSupport.mergeMaps( metadata, - mapOf( + Map.of( Knative.KNATIVE_TYPE, type.name()) ) ); @@ -249,7 +268,7 @@ public class KnativeEnvironment { } public String getPath() { - return getMetadata().get(Knative.SERVICE_META_PATH); + return getMetadata(Knative.SERVICE_META_PATH); } public String getPathOrDefault(String path) { @@ -257,13 +276,17 @@ public class KnativeEnvironment { } public String getEventType() { - return getMetadata().get(Knative.KNATIVE_EVENT_TYPE); + return getMetadata(Knative.KNATIVE_EVENT_TYPE); } public int getPortOrDefault(int port) { return getPort() != -1 ? getPort() : port; } + public String getUrl() { + return getMetadata(Knative.SERVICE_META_URL); + } + public String getMetadata(String key) { return getMetadata().get(key); } diff --git a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpProducer.java b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpProducer.java index d707a51..010bfb3 100644 --- a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpProducer.java +++ b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpProducer.java @@ -17,6 +17,7 @@ package org.apache.camel.component.knative.http; import java.util.Map; +import java.util.function.Supplier; import io.vertx.core.MultiMap; import io.vertx.core.Vertx; @@ -38,6 +39,7 @@ import org.apache.camel.support.DefaultMessage; import org.apache.camel.support.MessageHelper; import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.URISupport; +import org.apache.camel.util.function.Suppliers; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,6 +51,7 @@ public class KnativeHttpProducer extends DefaultAsyncProducer { private final Vertx vertx; private final WebClientOptions clientOptions; private final HeaderFilterStrategy headerFilterStrategy; + private final Supplier<String> uri; private WebClient client; @@ -65,6 +68,7 @@ public class KnativeHttpProducer extends DefaultAsyncProducer { this.vertx = ObjectHelper.notNull(vertx, "vertx"); this.clientOptions = ObjectHelper.supplyIfEmpty(clientOptions, WebClientOptions::new); this.headerFilterStrategy = new KnativeHttpHeaderFilterStrategy(); + this.uri = Suppliers.memorize(() -> computeUrl(serviceDefinition)); } @Override @@ -111,10 +115,7 @@ public class KnativeHttpProducer extends DefaultAsyncProducer { return true; } - final int port = serviceDefinition.getPortOrDefault(KnativeHttpTransport.DEFAULT_PORT); - final String path = serviceDefinition.getPathOrDefault(KnativeHttpTransport.DEFAULT_PATH); - - client.post(port, serviceDefinition.getHost(), path) + client.postAbs(this.uri.get()) .putHeaders(headers) .sendBuffer(Buffer.buffer(payload), response -> { if (response.succeeded()) { @@ -136,7 +137,7 @@ public class KnativeHttpProducer extends DefaultAsyncProducer { if (result.statusCode() < 200 || result.statusCode() >= 300) { String exceptionMessage = String.format( "HTTP operation failed invoking %s with statusCode: %d, statusMessage: %s", - URISupport.sanitizeUri(getURI()), + URISupport.sanitizeUri(this.uri.get()), result.statusCode(), result.statusMessage() ); @@ -148,7 +149,7 @@ public class KnativeHttpProducer extends DefaultAsyncProducer { exchange.setMessage(answer); } else if (response.failed()) { - String exceptionMessage = "HTTP operation failed invoking " + URISupport.sanitizeUri(getURI()); + String exceptionMessage = "HTTP operation failed invoking " + URISupport.sanitizeUri(this.uri.get()); if (response.result() != null) { exceptionMessage += " with statusCode: " + response.result().statusCode(); } @@ -180,12 +181,20 @@ public class KnativeHttpProducer extends DefaultAsyncProducer { } } - private String getURI() { - String p = ObjectHelper.supplyIfEmpty(serviceDefinition.getPath(), () -> KnativeHttpTransport.DEFAULT_PATH); - if (!p.startsWith("/")) { - p = "/" + p; + private static String computeUrl(KnativeEnvironment.KnativeServiceDefinition definition) { + String url = definition.getUrl(); + if (url == null) { + int port = definition.getPortOrDefault(KnativeHttpTransport.DEFAULT_PORT); + String path = definition.getPathOrDefault(KnativeHttpTransport.DEFAULT_PATH); + + if (!path.startsWith("/")) { + path = "/" + path; + } + + url = String.format("http://%s:%d%s", definition.getHost(), port, path); } - return String.format("http://%s:%d%s", serviceDefinition.getHost(), serviceDefinition.getPort(), p); + return url; } + } diff --git a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java index edb81cc..a896c1c 100644 --- a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java +++ b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java @@ -234,6 +234,49 @@ public class KnativeHttpTest { @ParameterizedTest @EnumSource(CloudEvents.class) + void testInvokeEndpointByUrl(CloudEvent ce) throws Exception { + configureKnativeComponent( + context, + ce, + endpoint( + Knative.EndpointKind.sink, + "myEndpoint", + "none", + -1, + mapOf( + Knative.SERVICE_META_PATH, "/does/not/exist", + Knative.SERVICE_META_URL, String.format("http://localhost:%d/a/path", platformHttpPort), + Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", + Knative.CONTENT_TYPE, "text/plain" + )) + ); + + RouteBuilder.addRoutes(context, b -> { + b.from("direct:source") + .to("knative:endpoint/myEndpoint"); + b.from("platform-http:/a/path") + .to("mock:ce"); + }); + + context.start(); + + MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class); + mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version()); + mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event"); + mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "knative://endpoint/myEndpoint"); + mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain"); + mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http())); + mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http())); + mock.expectedBodiesReceived("test"); + mock.expectedMessageCount(1); + + template.sendBody("direct:source", "test"); + + mock.assertIsSatisfied(); + } + + @ParameterizedTest + @EnumSource(CloudEvents.class) void testConsumeStructuredContent(CloudEvent ce) throws Exception { configureKnativeComponent(
