This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k-runtime.git
The following commit(s) were added to refs/heads/master by this push:
new c5ac5aa Fix #365: change sinkbinding to be a customizer unrelated to
the knative-source loader
new dd0b7a4 Merge pull request #411 from nicolaferraro/sink-binding
c5ac5aa is described below
commit c5ac5aae64d8b4f04028cc5f554fc22dc8a8d378
Author: Nicola Ferraro <[email protected]>
AuthorDate: Fri Jul 24 19:34:54 2020 +0200
Fix #365: change sinkbinding to be a customizer unrelated to the
knative-source loader
---
.../KnativeSinkBindingContextCustomizer.java | 133 +++++++++++++++++++
.../knative/KnativeSourceLoaderInterceptor.java | 52 --------
.../KnativeSinkBindingCustomizerTest.java} | 144 +++++++--------------
.../knative/KnativeSourceRoutesLoaderTest.java | 72 -----------
.../apache/camel/component/knative/knative.json | 2 +-
5 files changed, 180 insertions(+), 223 deletions(-)
diff --git
a/camel-k-runtime-knative/src/main/java/org/apache/camel/k/knative/customizer/KnativeSinkBindingContextCustomizer.java
b/camel-k-runtime-knative/src/main/java/org/apache/camel/k/knative/customizer/KnativeSinkBindingContextCustomizer.java
new file mode 100644
index 0000000..051195e
--- /dev/null
+++
b/camel-k-runtime-knative/src/main/java/org/apache/camel/k/knative/customizer/KnativeSinkBindingContextCustomizer.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.k.knative.customizer;
+
+import java.io.IOException;
+import java.io.Reader;
+import java.io.StringReader;
+import java.util.HashMap;
+import java.util.Optional;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.camel.CamelContext;
+import org.apache.camel.component.knative.spi.Knative;
+import org.apache.camel.component.knative.spi.KnativeEnvironment;
+import org.apache.camel.k.ContextCustomizer;
+import org.apache.camel.k.annotation.Customizer;
+import org.apache.camel.util.ObjectHelper;
+
+@Customizer("sinkbinding")
+public class KnativeSinkBindingContextCustomizer implements ContextCustomizer {
+
+ private String name;
+
+ private Knative.Type type;
+
+ private String kind;
+
+ private String apiVersion;
+
+ @Override
+ public void apply(CamelContext camelContext) {
+ createSyntheticDefinition(camelContext,
name).ifPresent(serviceDefinition -> {
+ // publish the synthetic service definition
+ camelContext.getRegistry().bind(name, serviceDefinition);
+ });
+ }
+
+ private Optional<KnativeEnvironment.KnativeServiceDefinition>
createSyntheticDefinition(
+ CamelContext camelContext,
+ String sinkName) {
+
+ final String kSinkUrl =
camelContext.resolvePropertyPlaceholders("{{k.sink:}}");
+ final String kCeOverride =
camelContext.resolvePropertyPlaceholders("{{k.ce.overrides:}}");
+
+ if (ObjectHelper.isNotEmpty(kSinkUrl)) {
+ // create a synthetic service definition to target the K_SINK url
+ var serviceBuilder = KnativeEnvironment.serviceBuilder(type,
sinkName)
+ .withMeta(Knative.CAMEL_ENDPOINT_KIND,
Knative.EndpointKind.sink)
+ .withMeta(Knative.SERVICE_META_URL, kSinkUrl);
+
+ if (ObjectHelper.isNotEmpty(kind)) {
+ serviceBuilder = serviceBuilder.withMeta(Knative.KNATIVE_KIND,
kind);
+ }
+
+ if (ObjectHelper.isNotEmpty(apiVersion)) {
+ serviceBuilder =
serviceBuilder.withMeta(Knative.KNATIVE_API_VERSION, apiVersion);
+ }
+
+ if (ObjectHelper.isNotEmpty(kCeOverride)) {
+ try (Reader reader = new StringReader(kCeOverride)) {
+ // assume K_CE_OVERRIDES is defined as simple key/val json
+ var overrides = Knative.MAPPER.readValue(
+ reader,
+ new TypeReference<HashMap<String, String>>() {
+ }
+ );
+
+ for (var entry : overrides.entrySet()) {
+ // generate proper ce-override meta-data for the
service
+ // definition
+ serviceBuilder.withMeta(
+ Knative.KNATIVE_CE_OVERRIDE_PREFIX +
entry.getKey(),
+ entry.getValue()
+ );
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ return Optional.of(serviceBuilder.build());
+ }
+
+ return Optional.empty();
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public Knative.Type getType() {
+ return type;
+ }
+
+ public void setType(Knative.Type type) {
+ this.type = type;
+ }
+
+ public String getKind() {
+ return kind;
+ }
+
+ public void setKind(String kind) {
+ this.kind = kind;
+ }
+
+ public String getApiVersion() {
+ return apiVersion;
+ }
+
+ public void setApiVersion(String apiVersion) {
+ this.apiVersion = apiVersion;
+ }
+
+}
diff --git
a/camel-k-runtime-knative/src/main/java/org/apache/camel/k/loader/knative/KnativeSourceLoaderInterceptor.java
b/camel-k-runtime-knative/src/main/java/org/apache/camel/k/loader/knative/KnativeSourceLoaderInterceptor.java
index 07f2113..ef303b8 100644
---
a/camel-k-runtime-knative/src/main/java/org/apache/camel/k/loader/knative/KnativeSourceLoaderInterceptor.java
+++
b/camel-k-runtime-knative/src/main/java/org/apache/camel/k/loader/knative/KnativeSourceLoaderInterceptor.java
@@ -16,25 +16,17 @@
*/
package org.apache.camel.k.loader.knative;
-import java.io.IOException;
-import java.io.Reader;
-import java.io.StringReader;
-import java.util.HashMap;
import java.util.List;
import java.util.Optional;
-import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.camel.CamelContext;
import org.apache.camel.RoutesBuilder;
-import org.apache.camel.component.knative.spi.Knative;
-import org.apache.camel.component.knative.spi.KnativeEnvironment;
import org.apache.camel.k.Source;
import org.apache.camel.k.SourceLoader;
import org.apache.camel.k.annotation.LoaderInterceptor;
import org.apache.camel.k.support.RuntimeSupport;
import org.apache.camel.model.RouteDefinition;
import org.apache.camel.model.ToDefinition;
-import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,11 +52,6 @@ public class KnativeSourceLoaderInterceptor implements
SourceLoader.Interceptor
final String sinkUri =
String.format("knative://endpoint/%s", sinkName);
final RouteDefinition definition = definitions.get(0);
- createSyntheticDefinition(camelContext,
sinkName).ifPresent(serviceDefinition -> {
- // publish the synthetic service definition
- camelContext.getRegistry().bind(sinkName,
serviceDefinition);
- });
-
LOGGER.info("Add sink:{} to route:{}", sinkUri,
definition.getId());
// assuming that route is linear like there's no
content based routing
@@ -83,43 +70,4 @@ public class KnativeSourceLoaderInterceptor implements
SourceLoader.Interceptor
};
}
- private static Optional<KnativeEnvironment.KnativeServiceDefinition>
createSyntheticDefinition(
- CamelContext camelContext,
- String sinkName) {
-
- final String kSinkUrl =
camelContext.resolvePropertyPlaceholders("{{k.sink:}}");
- final String kCeOverride =
camelContext.resolvePropertyPlaceholders("{{k.ce.overrides:}}");
-
- if (ObjectHelper.isNotEmpty(kSinkUrl)) {
- // create a synthetic service definition to target the K_SINK url
- var serviceBuilder =
KnativeEnvironment.serviceBuilder(Knative.Type.endpoint, sinkName)
- .withMeta(Knative.CAMEL_ENDPOINT_KIND,
Knative.EndpointKind.sink)
- .withMeta(Knative.SERVICE_META_URL, kSinkUrl);
-
- if (ObjectHelper.isNotEmpty(kCeOverride)) {
- try (Reader reader = new StringReader(kCeOverride)) {
- // assume K_CE_OVERRIDES is defined as simple key/val json
- var overrides = Knative.MAPPER.readValue(
- reader,
- new TypeReference<HashMap<String, String>>() { }
- );
-
- for (var entry: overrides.entrySet()) {
- // generate proper ce-override meta-data for the
service
- // definition
- serviceBuilder.withMeta(
- Knative.KNATIVE_CE_OVERRIDE_PREFIX +
entry.getKey(),
- entry.getValue()
- );
- }
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- return Optional.of(serviceBuilder.build());
- }
-
- return Optional.empty();
- }
}
diff --git
a/camel-k-runtime-knative/src/test/java/org/apache/camel/k/loader/knative/KnativeSourceRoutesLoaderTest.java
b/camel-k-runtime-knative/src/test/java/org/apache/camel/k/knative/customizer/KnativeSinkBindingCustomizerTest.java
similarity index 56%
copy from
camel-k-runtime-knative/src/test/java/org/apache/camel/k/loader/knative/KnativeSourceRoutesLoaderTest.java
copy to
camel-k-runtime-knative/src/test/java/org/apache/camel/k/knative/customizer/KnativeSinkBindingCustomizerTest.java
index 7af4acd..7dab46d 100644
---
a/camel-k-runtime-knative/src/test/java/org/apache/camel/k/loader/knative/KnativeSourceRoutesLoaderTest.java
+++
b/camel-k-runtime-knative/src/test/java/org/apache/camel/k/knative/customizer/KnativeSinkBindingCustomizerTest.java
@@ -14,8 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.camel.k.loader.knative;
+package org.apache.camel.k.knative.customizer;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -23,7 +24,6 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
-import java.util.stream.Stream;
import org.apache.camel.CamelContext;
import org.apache.camel.RoutesBuilder;
@@ -42,94 +42,46 @@ import org.apache.camel.k.SourceLoader;
import org.apache.camel.k.Sources;
import org.apache.camel.k.http.PlatformHttpServiceContextCustomizer;
import org.apache.camel.k.listener.RoutesConfigurer;
+import org.apache.camel.k.support.RuntimeSupport;
import org.apache.camel.k.test.AvailablePortFinder;
-import org.apache.camel.model.ModelCamelContext;
-import org.apache.camel.model.RouteDefinition;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.MethodSource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;
-public class KnativeSourceRoutesLoaderTest {
- private static final Logger LOGGER =
LoggerFactory.getLogger(KnativeSourceRoutesLoaderTest.class);
-
- static Stream<Arguments> parameters() {
- return Stream.of(
-
Arguments.arguments("classpath:sources/routes.yaml?interceptors=knative-source"),
-
Arguments.arguments("classpath:sources/routes.xml?interceptors=knative-source"),
-
Arguments.arguments("classpath:sources/routes.groovy?interceptors=knative-source"),
-
Arguments.arguments("classpath:sources/routes.kts?interceptors=knative-source"),
-
Arguments.arguments("classpath:sources/routes.js?interceptors=knative-source"),
-
Arguments.arguments("classpath:sources/routes.java?name=MyRoutes.java&interceptors=knative-source")
- ).sequential();
- }
-
- @ParameterizedTest
- @MethodSource("parameters")
- public void testWrapLoader(String uri) throws Exception {
- LOGGER.info("uri: {}", uri);
-
- final String data = UUID.randomUUID().toString();
- final TestRuntime runtime = new TestRuntime();
-
- KnativeComponent component = new KnativeComponent();
- component.setEnvironment(KnativeEnvironment.on(
- KnativeEnvironment.endpoint(Knative.EndpointKind.sink, "sink",
"localhost", runtime.port)
- ));
-
- CamelContext context = runtime.getCamelContext();
- context.addComponent(KnativeConstants.SCHEME, component);
-
- Source source = Sources.fromURI(uri);
- SourceLoader loader = RoutesConfigurer.load(runtime, source);
-
-
assertThat(loader.getSupportedLanguages()).contains(source.getLanguage());
- assertThat(runtime.builders).hasSize(1);
-
- try {
- context.addRoutes(runtime.builders.get(0));
- context.addRoutes(new RouteBuilder() {
- @Override
- public void configure() throws Exception {
- fromF("platform-http:/")
- .routeId("http")
- .to("mock:result");
- }
- });
- context.start();
-
- List<RouteDefinition> definitions =
context.adapt(ModelCamelContext.class).getRouteDefinitions();
-
- assertThat(definitions).hasSize(2);
- assertThat(definitions).first().satisfies(d -> {
- assertThat(d.getOutputs()).last().hasFieldOrPropertyWithValue(
- "endpointUri",
- "knative://endpoint/sink"
- );
+public class KnativeSinkBindingCustomizerTest {
+
+ @Test
+ public void testSinkBindingRegistration() throws Exception {
+ Runtime runtime = Runtime.on(new DefaultCamelContext());
+ runtime.setProperties(
+ "k.sink", "http://theurl",
+ "camel.k.customizer.sinkbinding.enabled", "true",
+ "camel.k.customizer.sinkbinding.name", "mychannel",
+ "camel.k.customizer.sinkbinding.type", "channel",
+ "camel.k.customizer.sinkbinding.kind", "InMemoryChannel",
+ "camel.k.customizer.sinkbinding.api-version",
"messaging.knative.dev/v1beta1");
+
+
+
assertThat(RuntimeSupport.configureContextCustomizers(runtime)).hasOnlyOneElementSatisfying(customizer
-> {
+
assertThat(customizer).isInstanceOfSatisfying(KnativeSinkBindingContextCustomizer.class,
sc -> {
+ assertThat(sc.getName()).isEqualTo("mychannel");
+ assertThat(sc.getType()).isEqualTo(Knative.Type.channel);
+
assertThat(sc.getApiVersion()).isEqualTo("messaging.knative.dev/v1beta1");
+ assertThat(sc.getKind()).isEqualTo("InMemoryChannel");
});
- MockEndpoint mock = context.getEndpoint("mock:result",
MockEndpoint.class);
- mock.expectedMessageCount(1);
- mock.expectedBodiesReceived(data);
-
- context.createFluentProducerTemplate()
- .to("direct:start")
- .withHeader("MyHeader", data)
- .send();
-
- mock.assertIsSatisfied();
- } finally {
- context.stop();
- }
+ var svc = runtime.getRegistry().lookupByNameAndType("mychannel",
KnativeEnvironment.KnativeServiceDefinition.class);
+ assertThat(svc).isNotNull();
+ assertThat(svc.getUrl()).isEqualTo("http://theurl");
+ assertThat(svc.getName()).isEqualTo("mychannel");
+ assertThat(svc.getType()).isEqualTo(Knative.Type.channel);
+
assertThat(svc.getMetadata(Knative.KNATIVE_API_VERSION)).isEqualTo("messaging.knative.dev/v1beta1");
+
assertThat(svc.getMetadata(Knative.KNATIVE_KIND)).isEqualTo("InMemoryChannel");
+ });
}
- @ParameterizedTest
- @MethodSource("parameters")
- public void testWrapLoaderWithSyntheticServiceDefinition(String uri)
throws Exception {
- LOGGER.info("uri: {}", uri);
+ @Test
+ public void testWrapLoaderWithSyntheticServiceDefinition() throws
Exception {
final String data = UUID.randomUUID().toString();
final TestRuntime runtime = new TestRuntime();
@@ -141,7 +93,9 @@ public class KnativeSourceRoutesLoaderTest {
component.setEnvironment(new
KnativeEnvironment(Collections.emptyList()));
Properties properties = new Properties();
- properties.put("knative.sink", "mySynk");
+ properties.put("camel.k.customizer.sinkbinding.enabled", "true");
+ properties.put("camel.k.customizer.sinkbinding.name", "mySynk");
+ properties.put("camel.k.customizer.sinkbinding.type", "endpoint");
properties.put("k.sink", String.format("http://localhost:%d",
runtime.port));
properties.put("k.ce.overrides",
Knative.MAPPER.writeValueAsString(Map.of(typeHeaderKey, typeHeaderVal)));
@@ -149,7 +103,9 @@ public class KnativeSourceRoutesLoaderTest {
context.getPropertiesComponent().setInitialProperties(properties);
context.addComponent(KnativeConstants.SCHEME, component);
- Source source = Sources.fromURI(uri);
+ RuntimeSupport.configureContextCustomizers(runtime);
+
+ Source source = Sources.fromBytes("groovy",
"from('direct:start').setBody().header('MyHeader').to('knative://endpoint/mySynk')".getBytes(StandardCharsets.UTF_8));
SourceLoader loader = RoutesConfigurer.load(runtime, source);
assertThat(loader.getSupportedLanguages()).contains(source.getLanguage());
@@ -161,23 +117,14 @@ public class KnativeSourceRoutesLoaderTest {
@Override
public void configure() throws Exception {
fromF("platform-http:/")
- .routeId("http")
- .to("mock:result");
+ .routeId("http")
+ .to("mock:result");
}
});
context.start();
- var definitions =
context.adapt(ModelCamelContext.class).getRouteDefinitions();
var services =
context.getRegistry().findByType(KnativeEnvironment.KnativeServiceDefinition.class);
- assertThat(definitions).hasSize(2);
- assertThat(definitions).first().satisfies(d -> {
- assertThat(d.getOutputs()).last().hasFieldOrPropertyWithValue(
- "endpointUri",
- "knative://endpoint/mySynk"
- );
- });
-
assertThat(services).hasSize(1);
assertThat(services).first().hasFieldOrPropertyWithValue("name",
"mySynk");
assertThat(services).first().hasFieldOrPropertyWithValue("url",
url);
@@ -188,9 +135,9 @@ public class KnativeSourceRoutesLoaderTest {
mock.expectedHeaderReceived(typeHeaderKey, typeHeaderVal);
context.createFluentProducerTemplate()
- .to("direct:start")
- .withHeader("MyHeader", data)
- .send();
+ .to("direct:start")
+ .withHeader("MyHeader", data)
+ .send();
mock.assertIsSatisfied();
} finally {
@@ -234,4 +181,5 @@ public class KnativeSourceRoutesLoaderTest {
public void setPropertiesLocations(Collection<String> locations) {
}
}
+
}
diff --git
a/camel-k-runtime-knative/src/test/java/org/apache/camel/k/loader/knative/KnativeSourceRoutesLoaderTest.java
b/camel-k-runtime-knative/src/test/java/org/apache/camel/k/loader/knative/KnativeSourceRoutesLoaderTest.java
index 7af4acd..513005e 100644
---
a/camel-k-runtime-knative/src/test/java/org/apache/camel/k/loader/knative/KnativeSourceRoutesLoaderTest.java
+++
b/camel-k-runtime-knative/src/test/java/org/apache/camel/k/loader/knative/KnativeSourceRoutesLoaderTest.java
@@ -126,78 +126,6 @@ public class KnativeSourceRoutesLoaderTest {
}
}
- @ParameterizedTest
- @MethodSource("parameters")
- public void testWrapLoaderWithSyntheticServiceDefinition(String uri)
throws Exception {
- LOGGER.info("uri: {}", uri);
-
- final String data = UUID.randomUUID().toString();
- final TestRuntime runtime = new TestRuntime();
- final String typeHeaderKey =
CloudEvents.v1_0.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http();
- final String typeHeaderVal = UUID.randomUUID().toString();
- final String url = String.format("http://localhost:%d", runtime.port);
-
- KnativeComponent component = new KnativeComponent();
- component.setEnvironment(new
KnativeEnvironment(Collections.emptyList()));
-
- Properties properties = new Properties();
- properties.put("knative.sink", "mySynk");
- properties.put("k.sink", String.format("http://localhost:%d",
runtime.port));
- properties.put("k.ce.overrides",
Knative.MAPPER.writeValueAsString(Map.of(typeHeaderKey, typeHeaderVal)));
-
- CamelContext context = runtime.getCamelContext();
- context.getPropertiesComponent().setInitialProperties(properties);
- context.addComponent(KnativeConstants.SCHEME, component);
-
- Source source = Sources.fromURI(uri);
- SourceLoader loader = RoutesConfigurer.load(runtime, source);
-
-
assertThat(loader.getSupportedLanguages()).contains(source.getLanguage());
- assertThat(runtime.builders).hasSize(1);
-
- try {
- context.addRoutes(runtime.builders.get(0));
- context.addRoutes(new RouteBuilder() {
- @Override
- public void configure() throws Exception {
- fromF("platform-http:/")
- .routeId("http")
- .to("mock:result");
- }
- });
- context.start();
-
- var definitions =
context.adapt(ModelCamelContext.class).getRouteDefinitions();
- var services =
context.getRegistry().findByType(KnativeEnvironment.KnativeServiceDefinition.class);
-
- assertThat(definitions).hasSize(2);
- assertThat(definitions).first().satisfies(d -> {
- assertThat(d.getOutputs()).last().hasFieldOrPropertyWithValue(
- "endpointUri",
- "knative://endpoint/mySynk"
- );
- });
-
- assertThat(services).hasSize(1);
- assertThat(services).first().hasFieldOrPropertyWithValue("name",
"mySynk");
- assertThat(services).first().hasFieldOrPropertyWithValue("url",
url);
-
- MockEndpoint mock = context.getEndpoint("mock:result",
MockEndpoint.class);
- mock.expectedMessageCount(1);
- mock.expectedBodiesReceived(data);
- mock.expectedHeaderReceived(typeHeaderKey, typeHeaderVal);
-
- context.createFluentProducerTemplate()
- .to("direct:start")
- .withHeader("MyHeader", data)
- .send();
-
- mock.assertIsSatisfied();
- } finally {
- context.stop();
- }
- }
-
static class TestRuntime implements Runtime {
private final CamelContext camelContext;
private final List<RoutesBuilder> builders;
diff --git
a/camel-knative/camel-knative/src/generated/resources/org/apache/camel/component/knative/knative.json
b/camel-knative/camel-knative/src/generated/resources/org/apache/camel/component/knative/knative.json
index 5d81d06..4954d47 100644
---
a/camel-knative/camel-knative/src/generated/resources/org/apache/camel/component/knative/knative.json
+++
b/camel-knative/camel-knative/src/generated/resources/org/apache/camel/component/knative/knative.json
@@ -11,7 +11,7 @@
"supportLevel": "Preview",
"groupId": "org.apache.camel.k",
"artifactId": "camel-knative",
- "version": "1.4.1-SNAPSHOT",
+ "version": "1.5.0-SNAPSHOT",
"scheme": "knative",
"extendsScheme": "",
"syntax": "knative:type\/name",