This is an automated email from the ASF dual-hosted git repository. lburgazzoli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-k-runtime.git
commit c48348130fafe949a6478e0b8511b6c84af90d10 Author: lburgazzoli <[email protected]> AuthorDate: Sun Jun 21 13:51:36 2020 +0200 Add runtime support for Knative sinkbinding #365 --- .../knative/KnativeSourceLoaderInterceptor.java | 62 +++++++++++++- .../knative/KnativeSourceRoutesLoaderTest.java | 95 ++++++++++++++++++++-- .../src/test/resources/sources/routes.groovy | 2 +- .../src/test/resources/sources/routes.java | 7 +- .../src/test/resources/sources/routes.js | 2 +- .../src/test/resources/sources/routes.kts | 2 +- .../src/test/resources/sources/routes.xml | 2 +- .../component/knative/spi/KnativeEnvironment.java | 39 +++++++++ .../camel/component/knative/KnativeEndpoint.java | 2 +- 9 files changed, 189 insertions(+), 24 deletions(-) diff --git a/camel-k-runtime-knative/src/main/java/org/apache/camel/k/loader/knative/KnativeSourceLoaderInterceptor.java b/camel-k-runtime-knative/src/main/java/org/apache/camel/k/loader/knative/KnativeSourceLoaderInterceptor.java index bee157b..07f2113 100644 --- a/camel-k-runtime-knative/src/main/java/org/apache/camel/k/loader/knative/KnativeSourceLoaderInterceptor.java +++ b/camel-k-runtime-knative/src/main/java/org/apache/camel/k/loader/knative/KnativeSourceLoaderInterceptor.java @@ -16,17 +16,25 @@ */ package org.apache.camel.k.loader.knative; +import java.io.IOException; +import java.io.Reader; +import java.io.StringReader; +import java.util.HashMap; import java.util.List; import java.util.Optional; +import com.fasterxml.jackson.core.type.TypeReference; import org.apache.camel.CamelContext; import org.apache.camel.RoutesBuilder; +import org.apache.camel.component.knative.spi.Knative; +import org.apache.camel.component.knative.spi.KnativeEnvironment; import org.apache.camel.k.Source; import org.apache.camel.k.SourceLoader; import org.apache.camel.k.annotation.LoaderInterceptor; import org.apache.camel.k.support.RuntimeSupport; import org.apache.camel.model.RouteDefinition; import org.apache.camel.model.ToDefinition; +import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,16 +54,22 @@ public class KnativeSourceLoaderInterceptor implements SourceLoader.Interceptor return RuntimeSupport.afterConfigure(result.builder(), builder -> { final CamelContext camelContext = builder.getContext(); final List<RouteDefinition> definitions = builder.getRouteCollection().getRoutes(); + if (definitions.size() == 1) { - final String sink = camelContext.resolvePropertyPlaceholders("{{env:KNATIVE_SINK:sink}}"); - final String uri = String.format("knative://endpoint/%s", sink); + final String sinkName = camelContext.resolvePropertyPlaceholders("{{knative.sink:sink}}"); + final String sinkUri = String.format("knative://endpoint/%s", sinkName); final RouteDefinition definition = definitions.get(0); - LOGGER.info("Add sink:{} to route:{}", uri, definition.getId()); + createSyntheticDefinition(camelContext, sinkName).ifPresent(serviceDefinition -> { + // publish the synthetic service definition + camelContext.getRegistry().bind(sinkName, serviceDefinition); + }); + + LOGGER.info("Add sink:{} to route:{}", sinkUri, definition.getId()); // assuming that route is linear like there's no content based routing // or ant other EIP that would branch the flow - definition.getOutputs().add(new ToDefinition(uri)); + definition.getOutputs().add(new ToDefinition(sinkUri)); } else { LOGGER.warn("Cannot determine route to enrich. the knative enpoint need to explicitly be defined"); } @@ -68,4 +82,44 @@ public class KnativeSourceLoaderInterceptor implements SourceLoader.Interceptor } }; } + + private static Optional<KnativeEnvironment.KnativeServiceDefinition> createSyntheticDefinition( + CamelContext camelContext, + String sinkName) { + + final String kSinkUrl = camelContext.resolvePropertyPlaceholders("{{k.sink:}}"); + final String kCeOverride = camelContext.resolvePropertyPlaceholders("{{k.ce.overrides:}}"); + + if (ObjectHelper.isNotEmpty(kSinkUrl)) { + // create a synthetic service definition to target the K_SINK url + var serviceBuilder = KnativeEnvironment.serviceBuilder(Knative.Type.endpoint, sinkName) + .withMeta(Knative.CAMEL_ENDPOINT_KIND, Knative.EndpointKind.sink) + .withMeta(Knative.SERVICE_META_URL, kSinkUrl); + + if (ObjectHelper.isNotEmpty(kCeOverride)) { + try (Reader reader = new StringReader(kCeOverride)) { + // assume K_CE_OVERRIDES is defined as simple key/val json + var overrides = Knative.MAPPER.readValue( + reader, + new TypeReference<HashMap<String, String>>() { } + ); + + for (var entry: overrides.entrySet()) { + // generate proper ce-override meta-data for the service + // definition + serviceBuilder.withMeta( + Knative.KNATIVE_CE_OVERRIDE_PREFIX + entry.getKey(), + entry.getValue() + ); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + return Optional.of(serviceBuilder.build()); + } + + return Optional.empty(); + } } diff --git a/camel-k-runtime-knative/src/test/java/org/apache/camel/k/loader/knative/KnativeSourceRoutesLoaderTest.java b/camel-k-runtime-knative/src/test/java/org/apache/camel/k/loader/knative/KnativeSourceRoutesLoaderTest.java index e036299..7af4acd 100644 --- a/camel-k-runtime-knative/src/test/java/org/apache/camel/k/loader/knative/KnativeSourceRoutesLoaderTest.java +++ b/camel-k-runtime-knative/src/test/java/org/apache/camel/k/loader/knative/KnativeSourceRoutesLoaderTest.java @@ -18,7 +18,10 @@ package org.apache.camel.k.loader.knative; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.Properties; import java.util.UUID; import java.util.stream.Stream; @@ -26,6 +29,9 @@ import org.apache.camel.CamelContext; import org.apache.camel.RoutesBuilder; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.knative.KnativeComponent; +import org.apache.camel.component.knative.KnativeConstants; +import org.apache.camel.component.knative.spi.CloudEvent; +import org.apache.camel.component.knative.spi.CloudEvents; import org.apache.camel.component.knative.spi.Knative; import org.apache.camel.component.knative.spi.KnativeEnvironment; import org.apache.camel.component.mock.MockEndpoint; @@ -74,11 +80,8 @@ public class KnativeSourceRoutesLoaderTest { KnativeEnvironment.endpoint(Knative.EndpointKind.sink, "sink", "localhost", runtime.port) )); - CamelContext context = runtime.getCamelContext(); - context.disableJMX(); - context.setStreamCaching(true); - context.addComponent("knative", component); + context.addComponent(KnativeConstants.SCHEME, component); Source source = Sources.fromURI(uri); SourceLoader loader = RoutesConfigurer.load(runtime, source); @@ -112,11 +115,82 @@ public class KnativeSourceRoutesLoaderTest { mock.expectedMessageCount(1); mock.expectedBodiesReceived(data); - context.createProducerTemplate().sendBodyAndHeader( - "direct:start", - "", - "MyHeader", - data); + context.createFluentProducerTemplate() + .to("direct:start") + .withHeader("MyHeader", data) + .send(); + + mock.assertIsSatisfied(); + } finally { + context.stop(); + } + } + + @ParameterizedTest + @MethodSource("parameters") + public void testWrapLoaderWithSyntheticServiceDefinition(String uri) throws Exception { + LOGGER.info("uri: {}", uri); + + final String data = UUID.randomUUID().toString(); + final TestRuntime runtime = new TestRuntime(); + final String typeHeaderKey = CloudEvents.v1_0.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(); + final String typeHeaderVal = UUID.randomUUID().toString(); + final String url = String.format("http://localhost:%d", runtime.port); + + KnativeComponent component = new KnativeComponent(); + component.setEnvironment(new KnativeEnvironment(Collections.emptyList())); + + Properties properties = new Properties(); + properties.put("knative.sink", "mySynk"); + properties.put("k.sink", String.format("http://localhost:%d", runtime.port)); + properties.put("k.ce.overrides", Knative.MAPPER.writeValueAsString(Map.of(typeHeaderKey, typeHeaderVal))); + + CamelContext context = runtime.getCamelContext(); + context.getPropertiesComponent().setInitialProperties(properties); + context.addComponent(KnativeConstants.SCHEME, component); + + Source source = Sources.fromURI(uri); + SourceLoader loader = RoutesConfigurer.load(runtime, source); + + assertThat(loader.getSupportedLanguages()).contains(source.getLanguage()); + assertThat(runtime.builders).hasSize(1); + + try { + context.addRoutes(runtime.builders.get(0)); + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + fromF("platform-http:/") + .routeId("http") + .to("mock:result"); + } + }); + context.start(); + + var definitions = context.adapt(ModelCamelContext.class).getRouteDefinitions(); + var services = context.getRegistry().findByType(KnativeEnvironment.KnativeServiceDefinition.class); + + assertThat(definitions).hasSize(2); + assertThat(definitions).first().satisfies(d -> { + assertThat(d.getOutputs()).last().hasFieldOrPropertyWithValue( + "endpointUri", + "knative://endpoint/mySynk" + ); + }); + + assertThat(services).hasSize(1); + assertThat(services).first().hasFieldOrPropertyWithValue("name", "mySynk"); + assertThat(services).first().hasFieldOrPropertyWithValue("url", url); + + MockEndpoint mock = context.getEndpoint("mock:result", MockEndpoint.class); + mock.expectedMessageCount(1); + mock.expectedBodiesReceived(data); + mock.expectedHeaderReceived(typeHeaderKey, typeHeaderVal); + + context.createFluentProducerTemplate() + .to("direct:start") + .withHeader("MyHeader", data) + .send(); mock.assertIsSatisfied(); } finally { @@ -131,6 +205,9 @@ public class KnativeSourceRoutesLoaderTest { public TestRuntime() { this.camelContext = new DefaultCamelContext(); + this.camelContext.disableJMX(); + this.camelContext.setStreamCaching(true); + this.builders = new ArrayList<>(); this.port = AvailablePortFinder.getNextAvailable(); diff --git a/camel-k-runtime-knative/src/test/resources/sources/routes.groovy b/camel-k-runtime-knative/src/test/resources/sources/routes.groovy index d4e7b78..6f1b3ce 100644 --- a/camel-k-runtime-knative/src/test/resources/sources/routes.groovy +++ b/camel-k-runtime-knative/src/test/resources/sources/routes.groovy @@ -15,5 +15,5 @@ * limitations under the License. */ from('direct:start') - .setBody().simple('${header[MyHeader]}') + .setBody().header('MyHeader') .to('log:knative') \ No newline at end of file diff --git a/camel-k-runtime-knative/src/test/resources/sources/routes.java b/camel-k-runtime-knative/src/test/resources/sources/routes.java index ffa38de..14eb9e9 100644 --- a/camel-k-runtime-knative/src/test/resources/sources/routes.java +++ b/camel-k-runtime-knative/src/test/resources/sources/routes.java @@ -21,12 +21,7 @@ public class MyRoutes extends RouteBuilder { @Override public void configure() throws Exception { from("direct:start") - .setBody().simple("${header[MyHeader]}") + .setBody().header("MyHeader") .to("log:knative"); } - - @BindToRegistry("my-bean") - public static String myBean() { - return "my-bean-string"; - } } \ No newline at end of file diff --git a/camel-k-runtime-knative/src/test/resources/sources/routes.js b/camel-k-runtime-knative/src/test/resources/sources/routes.js index 75de4de..041e045 100644 --- a/camel-k-runtime-knative/src/test/resources/sources/routes.js +++ b/camel-k-runtime-knative/src/test/resources/sources/routes.js @@ -15,5 +15,5 @@ * limitations under the License. */ from('direct:start') - .setBody().simple('${header[MyHeader]}') + .setBody().header('MyHeader') .to('log:knative'); \ No newline at end of file diff --git a/camel-k-runtime-knative/src/test/resources/sources/routes.kts b/camel-k-runtime-knative/src/test/resources/sources/routes.kts index 48b8b1c..54cc88c 100644 --- a/camel-k-runtime-knative/src/test/resources/sources/routes.kts +++ b/camel-k-runtime-knative/src/test/resources/sources/routes.kts @@ -15,6 +15,6 @@ * limitations under the License. */ from("direct:start") - .setBody().simple("\${header[MyHeader]}") + .setBody().header("MyHeader") .to("log:knative") diff --git a/camel-k-runtime-knative/src/test/resources/sources/routes.xml b/camel-k-runtime-knative/src/test/resources/sources/routes.xml index f0d953a..a7daa07 100644 --- a/camel-k-runtime-knative/src/test/resources/sources/routes.xml +++ b/camel-k-runtime-knative/src/test/resources/sources/routes.xml @@ -21,7 +21,7 @@ <route> <from uri="direct:start"/> <setBody> - <simple>${header[MyHeader]}</simple> + <header>MyHeader</header> </setBody> <to uri="log:knative"/> </route> diff --git a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeEnvironment.java b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeEnvironment.java index 2884dc9..5c9b7a8 100644 --- a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeEnvironment.java +++ b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeEnvironment.java @@ -19,6 +19,8 @@ package org.apache.camel.component.knative.spi; import java.io.InputStream; import java.io.Reader; import java.io.StringReader; +import java.net.MalformedURLException; +import java.net.URL; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -263,11 +265,48 @@ public class KnativeEnvironment { ); } + @Override + public String getHost() { + String urlAsString = getUrl(); + if (urlAsString != null) { + try { + return new URL(urlAsString).getHost(); + } catch (MalformedURLException e) { + throw new RuntimeException(e); + } + } + + return super.getHost(); + } + + @Override + public int getPort() { + String urlAsString = getUrl(); + if (urlAsString != null) { + try { + return new URL(urlAsString).getPort(); + } catch (MalformedURLException e) { + throw new RuntimeException(e); + } + } + + return super.getPort(); + } + public Knative.Type getType() { return Knative.Type.valueOf(getMetadata().get(Knative.KNATIVE_TYPE)); } public String getPath() { + String urlAsString = getUrl(); + if (urlAsString != null) { + try { + return new URL(urlAsString).getPath(); + } catch (MalformedURLException e) { + throw new RuntimeException(e); + } + } + return getMetadata(Knative.SERVICE_META_PATH); } diff --git a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java index 4b989bf..f02f1f5 100644 --- a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java +++ b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java @@ -139,7 +139,7 @@ public class KnativeEndpoint extends DefaultEndpoint { // KnativeEnvironment.KnativeServiceDefinition service = lookupServiceDefinition(serviceName, endpointKind) .or(() -> lookupServiceDefinition("default", endpointKind)) - .orElseThrow(() -> new IllegalArgumentException(String.format("Unable to find a service definition for %s/%s/%s", type, serviceName, endpointKind))); + .orElseThrow(() -> new IllegalArgumentException(String.format("Unable to find a service definition for %s/%s/%s", type, endpointKind, serviceName))); final Map<String, String> metadata = new HashMap<>(service.getMetadata());
