This is an automated email from the ASF dual-hosted git repository.

lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k-runtime.git

commit c48348130fafe949a6478e0b8511b6c84af90d10
Author: lburgazzoli <[email protected]>
AuthorDate: Sun Jun 21 13:51:36 2020 +0200

    Add runtime support for Knative sinkbinding #365
---
 .../knative/KnativeSourceLoaderInterceptor.java    | 62 +++++++++++++-
 .../knative/KnativeSourceRoutesLoaderTest.java     | 95 ++++++++++++++++++++--
 .../src/test/resources/sources/routes.groovy       |  2 +-
 .../src/test/resources/sources/routes.java         |  7 +-
 .../src/test/resources/sources/routes.js           |  2 +-
 .../src/test/resources/sources/routes.kts          |  2 +-
 .../src/test/resources/sources/routes.xml          |  2 +-
 .../component/knative/spi/KnativeEnvironment.java  | 39 +++++++++
 .../camel/component/knative/KnativeEndpoint.java   |  2 +-
 9 files changed, 189 insertions(+), 24 deletions(-)

diff --git 
a/camel-k-runtime-knative/src/main/java/org/apache/camel/k/loader/knative/KnativeSourceLoaderInterceptor.java
 
b/camel-k-runtime-knative/src/main/java/org/apache/camel/k/loader/knative/KnativeSourceLoaderInterceptor.java
index bee157b..07f2113 100644
--- 
a/camel-k-runtime-knative/src/main/java/org/apache/camel/k/loader/knative/KnativeSourceLoaderInterceptor.java
+++ 
b/camel-k-runtime-knative/src/main/java/org/apache/camel/k/loader/knative/KnativeSourceLoaderInterceptor.java
@@ -16,17 +16,25 @@
  */
 package org.apache.camel.k.loader.knative;
 
+import java.io.IOException;
+import java.io.Reader;
+import java.io.StringReader;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Optional;
 
+import com.fasterxml.jackson.core.type.TypeReference;
 import org.apache.camel.CamelContext;
 import org.apache.camel.RoutesBuilder;
+import org.apache.camel.component.knative.spi.Knative;
+import org.apache.camel.component.knative.spi.KnativeEnvironment;
 import org.apache.camel.k.Source;
 import org.apache.camel.k.SourceLoader;
 import org.apache.camel.k.annotation.LoaderInterceptor;
 import org.apache.camel.k.support.RuntimeSupport;
 import org.apache.camel.model.RouteDefinition;
 import org.apache.camel.model.ToDefinition;
+import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,16 +54,22 @@ public class KnativeSourceLoaderInterceptor implements 
SourceLoader.Interceptor
                 return RuntimeSupport.afterConfigure(result.builder(), builder 
-> {
                     final CamelContext camelContext = builder.getContext();
                     final List<RouteDefinition> definitions = 
builder.getRouteCollection().getRoutes();
+
                     if (definitions.size() == 1) {
-                        final String sink = 
camelContext.resolvePropertyPlaceholders("{{env:KNATIVE_SINK:sink}}");
-                        final String uri = 
String.format("knative://endpoint/%s", sink);
+                        final String sinkName = 
camelContext.resolvePropertyPlaceholders("{{knative.sink:sink}}");
+                        final String sinkUri = 
String.format("knative://endpoint/%s", sinkName);
                         final RouteDefinition definition = definitions.get(0);
 
-                        LOGGER.info("Add sink:{} to route:{}", uri, 
definition.getId());
+                        createSyntheticDefinition(camelContext, 
sinkName).ifPresent(serviceDefinition -> {
+                            // publish the synthetic service definition
+                            camelContext.getRegistry().bind(sinkName, 
serviceDefinition);
+                        });
+
+                        LOGGER.info("Add sink:{} to route:{}", sinkUri, 
definition.getId());
 
                         // assuming that route is linear like there's no 
content based routing
                         // or ant other EIP that would branch the flow
-                        definition.getOutputs().add(new ToDefinition(uri));
+                        definition.getOutputs().add(new ToDefinition(sinkUri));
                     } else {
                         LOGGER.warn("Cannot determine route to enrich. the 
knative enpoint need to explicitly be defined");
                     }
@@ -68,4 +82,44 @@ public class KnativeSourceLoaderInterceptor implements 
SourceLoader.Interceptor
             }
         };
     }
+
+    private static Optional<KnativeEnvironment.KnativeServiceDefinition> 
createSyntheticDefinition(
+            CamelContext camelContext,
+            String sinkName) {
+
+        final String kSinkUrl = 
camelContext.resolvePropertyPlaceholders("{{k.sink:}}");
+        final String kCeOverride = 
camelContext.resolvePropertyPlaceholders("{{k.ce.overrides:}}");
+
+        if (ObjectHelper.isNotEmpty(kSinkUrl)) {
+            // create a synthetic service definition to target the K_SINK url
+            var serviceBuilder = 
KnativeEnvironment.serviceBuilder(Knative.Type.endpoint, sinkName)
+                .withMeta(Knative.CAMEL_ENDPOINT_KIND, 
Knative.EndpointKind.sink)
+                .withMeta(Knative.SERVICE_META_URL, kSinkUrl);
+
+            if (ObjectHelper.isNotEmpty(kCeOverride)) {
+                try (Reader reader = new StringReader(kCeOverride)) {
+                    // assume K_CE_OVERRIDES is defined as simple key/val json
+                    var overrides = Knative.MAPPER.readValue(
+                        reader,
+                        new TypeReference<HashMap<String, String>>() { }
+                    );
+
+                    for (var entry: overrides.entrySet()) {
+                        // generate proper ce-override meta-data for the 
service
+                        // definition
+                        serviceBuilder.withMeta(
+                            Knative.KNATIVE_CE_OVERRIDE_PREFIX + 
entry.getKey(),
+                            entry.getValue()
+                        );
+                    }
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+
+            return Optional.of(serviceBuilder.build());
+        }
+
+        return Optional.empty();
+    }
 }
diff --git 
a/camel-k-runtime-knative/src/test/java/org/apache/camel/k/loader/knative/KnativeSourceRoutesLoaderTest.java
 
b/camel-k-runtime-knative/src/test/java/org/apache/camel/k/loader/knative/KnativeSourceRoutesLoaderTest.java
index e036299..7af4acd 100644
--- 
a/camel-k-runtime-knative/src/test/java/org/apache/camel/k/loader/knative/KnativeSourceRoutesLoaderTest.java
+++ 
b/camel-k-runtime-knative/src/test/java/org/apache/camel/k/loader/knative/KnativeSourceRoutesLoaderTest.java
@@ -18,7 +18,10 @@ package org.apache.camel.k.loader.knative;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import java.util.Properties;
 import java.util.UUID;
 import java.util.stream.Stream;
 
@@ -26,6 +29,9 @@ import org.apache.camel.CamelContext;
 import org.apache.camel.RoutesBuilder;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.knative.KnativeComponent;
+import org.apache.camel.component.knative.KnativeConstants;
+import org.apache.camel.component.knative.spi.CloudEvent;
+import org.apache.camel.component.knative.spi.CloudEvents;
 import org.apache.camel.component.knative.spi.Knative;
 import org.apache.camel.component.knative.spi.KnativeEnvironment;
 import org.apache.camel.component.mock.MockEndpoint;
@@ -74,11 +80,8 @@ public class KnativeSourceRoutesLoaderTest {
             KnativeEnvironment.endpoint(Knative.EndpointKind.sink, "sink", 
"localhost", runtime.port)
         ));
 
-
         CamelContext context = runtime.getCamelContext();
-        context.disableJMX();
-        context.setStreamCaching(true);
-        context.addComponent("knative", component);
+        context.addComponent(KnativeConstants.SCHEME, component);
 
         Source source = Sources.fromURI(uri);
         SourceLoader loader = RoutesConfigurer.load(runtime, source);
@@ -112,11 +115,82 @@ public class KnativeSourceRoutesLoaderTest {
             mock.expectedMessageCount(1);
             mock.expectedBodiesReceived(data);
 
-            context.createProducerTemplate().sendBodyAndHeader(
-                "direct:start",
-                "",
-                "MyHeader",
-                data);
+            context.createFluentProducerTemplate()
+                .to("direct:start")
+                .withHeader("MyHeader", data)
+                .send();
+
+            mock.assertIsSatisfied();
+        } finally {
+            context.stop();
+        }
+    }
+
+    @ParameterizedTest
+    @MethodSource("parameters")
+    public void testWrapLoaderWithSyntheticServiceDefinition(String uri) 
throws Exception {
+        LOGGER.info("uri: {}", uri);
+
+        final String data = UUID.randomUUID().toString();
+        final TestRuntime runtime = new TestRuntime();
+        final String typeHeaderKey = 
CloudEvents.v1_0.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http();
+        final String typeHeaderVal = UUID.randomUUID().toString();
+        final String url = String.format("http://localhost:%d";, runtime.port);
+
+        KnativeComponent component = new KnativeComponent();
+        component.setEnvironment(new 
KnativeEnvironment(Collections.emptyList()));
+
+        Properties properties = new Properties();
+        properties.put("knative.sink", "mySynk");
+        properties.put("k.sink", String.format("http://localhost:%d";, 
runtime.port));
+        properties.put("k.ce.overrides", 
Knative.MAPPER.writeValueAsString(Map.of(typeHeaderKey, typeHeaderVal)));
+
+        CamelContext context = runtime.getCamelContext();
+        context.getPropertiesComponent().setInitialProperties(properties);
+        context.addComponent(KnativeConstants.SCHEME, component);
+
+        Source source = Sources.fromURI(uri);
+        SourceLoader loader = RoutesConfigurer.load(runtime, source);
+
+        
assertThat(loader.getSupportedLanguages()).contains(source.getLanguage());
+        assertThat(runtime.builders).hasSize(1);
+
+        try {
+            context.addRoutes(runtime.builders.get(0));
+            context.addRoutes(new RouteBuilder() {
+                @Override
+                public void configure() throws Exception {
+                    fromF("platform-http:/")
+                        .routeId("http")
+                        .to("mock:result");
+                }
+            });
+            context.start();
+
+            var definitions = 
context.adapt(ModelCamelContext.class).getRouteDefinitions();
+            var services = 
context.getRegistry().findByType(KnativeEnvironment.KnativeServiceDefinition.class);
+
+            assertThat(definitions).hasSize(2);
+            assertThat(definitions).first().satisfies(d -> {
+                assertThat(d.getOutputs()).last().hasFieldOrPropertyWithValue(
+                    "endpointUri",
+                    "knative://endpoint/mySynk"
+                );
+            });
+
+            assertThat(services).hasSize(1);
+            assertThat(services).first().hasFieldOrPropertyWithValue("name", 
"mySynk");
+            assertThat(services).first().hasFieldOrPropertyWithValue("url", 
url);
+
+            MockEndpoint mock = context.getEndpoint("mock:result", 
MockEndpoint.class);
+            mock.expectedMessageCount(1);
+            mock.expectedBodiesReceived(data);
+            mock.expectedHeaderReceived(typeHeaderKey, typeHeaderVal);
+
+            context.createFluentProducerTemplate()
+                .to("direct:start")
+                .withHeader("MyHeader", data)
+                .send();
 
             mock.assertIsSatisfied();
         } finally {
@@ -131,6 +205,9 @@ public class KnativeSourceRoutesLoaderTest {
 
         public TestRuntime() {
             this.camelContext = new DefaultCamelContext();
+            this.camelContext.disableJMX();
+            this.camelContext.setStreamCaching(true);
+
             this.builders = new ArrayList<>();
             this.port = AvailablePortFinder.getNextAvailable();
 
diff --git a/camel-k-runtime-knative/src/test/resources/sources/routes.groovy 
b/camel-k-runtime-knative/src/test/resources/sources/routes.groovy
index d4e7b78..6f1b3ce 100644
--- a/camel-k-runtime-knative/src/test/resources/sources/routes.groovy
+++ b/camel-k-runtime-knative/src/test/resources/sources/routes.groovy
@@ -15,5 +15,5 @@
  * limitations under the License.
  */
 from('direct:start')
-    .setBody().simple('${header[MyHeader]}')
+    .setBody().header('MyHeader')
     .to('log:knative')
\ No newline at end of file
diff --git a/camel-k-runtime-knative/src/test/resources/sources/routes.java 
b/camel-k-runtime-knative/src/test/resources/sources/routes.java
index ffa38de..14eb9e9 100644
--- a/camel-k-runtime-knative/src/test/resources/sources/routes.java
+++ b/camel-k-runtime-knative/src/test/resources/sources/routes.java
@@ -21,12 +21,7 @@ public class MyRoutes extends RouteBuilder {
     @Override
     public void configure() throws Exception {
         from("direct:start")
-            .setBody().simple("${header[MyHeader]}")
+            .setBody().header("MyHeader")
             .to("log:knative");
     }
-
-    @BindToRegistry("my-bean")
-    public static String myBean() {
-        return "my-bean-string";
-    }
 }
\ No newline at end of file
diff --git a/camel-k-runtime-knative/src/test/resources/sources/routes.js 
b/camel-k-runtime-knative/src/test/resources/sources/routes.js
index 75de4de..041e045 100644
--- a/camel-k-runtime-knative/src/test/resources/sources/routes.js
+++ b/camel-k-runtime-knative/src/test/resources/sources/routes.js
@@ -15,5 +15,5 @@
  * limitations under the License.
  */
 from('direct:start')
-    .setBody().simple('${header[MyHeader]}')
+    .setBody().header('MyHeader')
     .to('log:knative');
\ No newline at end of file
diff --git a/camel-k-runtime-knative/src/test/resources/sources/routes.kts 
b/camel-k-runtime-knative/src/test/resources/sources/routes.kts
index 48b8b1c..54cc88c 100644
--- a/camel-k-runtime-knative/src/test/resources/sources/routes.kts
+++ b/camel-k-runtime-knative/src/test/resources/sources/routes.kts
@@ -15,6 +15,6 @@
  * limitations under the License.
  */
 from("direct:start")
-    .setBody().simple("\${header[MyHeader]}")
+    .setBody().header("MyHeader")
     .to("log:knative")
 
diff --git a/camel-k-runtime-knative/src/test/resources/sources/routes.xml 
b/camel-k-runtime-knative/src/test/resources/sources/routes.xml
index f0d953a..a7daa07 100644
--- a/camel-k-runtime-knative/src/test/resources/sources/routes.xml
+++ b/camel-k-runtime-knative/src/test/resources/sources/routes.xml
@@ -21,7 +21,7 @@
   <route>
     <from uri="direct:start"/>
     <setBody>
-      <simple>${header[MyHeader]}</simple>
+      <header>MyHeader</header>
     </setBody>
     <to uri="log:knative"/>
   </route>
diff --git 
a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeEnvironment.java
 
b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeEnvironment.java
index 2884dc9..5c9b7a8 100644
--- 
a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeEnvironment.java
+++ 
b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeEnvironment.java
@@ -19,6 +19,8 @@ package org.apache.camel.component.knative.spi;
 import java.io.InputStream;
 import java.io.Reader;
 import java.io.StringReader;
+import java.net.MalformedURLException;
+import java.net.URL;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -263,11 +265,48 @@ public class KnativeEnvironment {
             );
         }
 
+        @Override
+        public String getHost() {
+            String urlAsString = getUrl();
+            if (urlAsString != null) {
+                try {
+                    return new URL(urlAsString).getHost();
+                } catch (MalformedURLException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+
+            return super.getHost();
+        }
+
+        @Override
+        public int getPort() {
+            String urlAsString = getUrl();
+            if (urlAsString != null) {
+                try {
+                    return new URL(urlAsString).getPort();
+                } catch (MalformedURLException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+
+            return super.getPort();
+        }
+
         public Knative.Type getType() {
             return 
Knative.Type.valueOf(getMetadata().get(Knative.KNATIVE_TYPE));
         }
 
         public String getPath() {
+            String urlAsString = getUrl();
+            if (urlAsString != null) {
+                try {
+                    return new URL(urlAsString).getPath();
+                } catch (MalformedURLException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+
             return getMetadata(Knative.SERVICE_META_PATH);
         }
 
diff --git 
a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java
 
b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java
index 4b989bf..f02f1f5 100644
--- 
a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java
+++ 
b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java
@@ -139,7 +139,7 @@ public class KnativeEndpoint extends DefaultEndpoint {
         //
         KnativeEnvironment.KnativeServiceDefinition service = 
lookupServiceDefinition(serviceName, endpointKind)
             .or(() -> lookupServiceDefinition("default", endpointKind))
-            .orElseThrow(() -> new 
IllegalArgumentException(String.format("Unable to find a service definition for 
%s/%s/%s", type, serviceName, endpointKind)));
+            .orElseThrow(() -> new 
IllegalArgumentException(String.format("Unable to find a service definition for 
%s/%s/%s", type, endpointKind, serviceName)));
 
         final Map<String, String> metadata = new 
HashMap<>(service.getMetadata());
 

Reply via email to