This is an automated email from the ASF dual-hosted git repository. lburgazzoli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-k-runtime.git
commit 55c843177abf732a0156b9046cb6cdea55142e69 Author: Luca Burgazzoli <[email protected]> AuthorDate: Fri Sep 18 12:53:18 2020 +0200 loaders: simplify source loader interceptors --- .../org/apache/camel/k/support/RuntimeSupport.java | 41 --------------- .../org/apache/camel/k/support/SourcesSupport.java | 34 +++++++++++++ .../camel/k/cron/CronSourceLoaderInterceptor.java | 58 ++++++++++++---------- .../knative/KnativeSourceLoaderInterceptor.java | 43 +++++++++------- 4 files changed, 89 insertions(+), 87 deletions(-) diff --git a/camel-k-runtime-core/src/main/java/org/apache/camel/k/support/RuntimeSupport.java b/camel-k-runtime-core/src/main/java/org/apache/camel/k/support/RuntimeSupport.java index 4c9da08..72e9796 100644 --- a/camel-k-runtime-core/src/main/java/org/apache/camel/k/support/RuntimeSupport.java +++ b/camel-k-runtime-core/src/main/java/org/apache/camel/k/support/RuntimeSupport.java @@ -19,20 +19,15 @@ package org.apache.camel.k.support; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Properties; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; -import java.util.function.Consumer; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.camel.CamelContext; import org.apache.camel.ExtendedCamelContext; -import org.apache.camel.RoutesBuilder; -import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.builder.RouteBuilderLifecycleStrategy; import org.apache.camel.k.Constants; import org.apache.camel.k.ContextCustomizer; import org.apache.camel.k.Source; @@ -252,40 +247,4 @@ public final class RuntimeSupport { return answer; } - public static Optional<RoutesBuilder> beforeConfigure(Optional<RoutesBuilder> builder, Consumer<RouteBuilder> consumer) { - return builder.map(b -> { - if (b instanceof RouteBuilder) { - ((RouteBuilder) b).addLifecycleInterceptor(beforeConfigure(consumer)); - } - return b; - }); - } - - public static RouteBuilderLifecycleStrategy beforeConfigure(Consumer<RouteBuilder> consumer) { - return new RouteBuilderLifecycleStrategy() { - @Override - public void beforeConfigure(RouteBuilder builder) { - consumer.accept(builder); - } - }; - } - - public static Optional<RoutesBuilder> afterConfigure(Optional<RoutesBuilder> builder, Consumer<RouteBuilder> consumer) { - return builder.map(b -> { - if (b instanceof RouteBuilder) { - ((RouteBuilder) b).addLifecycleInterceptor(afterConfigure(consumer)); - } - return b; - }); - } - - public static RouteBuilderLifecycleStrategy afterConfigure(Consumer<RouteBuilder> consumer) { - return new RouteBuilderLifecycleStrategy() { - @Override - public void afterConfigure(RouteBuilder builder) { - consumer.accept(builder); - } - }; - } - } diff --git a/camel-k-runtime-core/src/main/java/org/apache/camel/k/support/SourcesSupport.java b/camel-k-runtime-core/src/main/java/org/apache/camel/k/support/SourcesSupport.java index 3a87ef2..0482ec7 100644 --- a/camel-k-runtime-core/src/main/java/org/apache/camel/k/support/SourcesSupport.java +++ b/camel-k-runtime-core/src/main/java/org/apache/camel/k/support/SourcesSupport.java @@ -17,7 +17,9 @@ package org.apache.camel.k.support; import java.util.List; +import java.util.function.Consumer; +import org.apache.camel.RoutesBuilder; import org.apache.camel.RuntimeCamelException; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.builder.RouteBuilderLifecycleStrategy; @@ -161,4 +163,36 @@ public final class SourcesSupport { } ); } + + public static RoutesBuilder beforeConfigure(RoutesBuilder builder, Consumer<RouteBuilder> consumer) { + if (builder instanceof RouteBuilder) { + ((RouteBuilder) builder).addLifecycleInterceptor(beforeConfigure(consumer)); + } + return builder; + } + + public static RouteBuilderLifecycleStrategy beforeConfigure(Consumer<RouteBuilder> consumer) { + return new RouteBuilderLifecycleStrategy() { + @Override + public void beforeConfigure(RouteBuilder builder) { + consumer.accept(builder); + } + }; + } + + public static RoutesBuilder afterConfigure(RoutesBuilder builder, Consumer<RouteBuilder> consumer) { + if (builder instanceof RouteBuilder) { + ((RouteBuilder) builder).addLifecycleInterceptor(afterConfigure(consumer)); + } + return builder; + } + + public static RouteBuilderLifecycleStrategy afterConfigure(Consumer<RouteBuilder> consumer) { + return new RouteBuilderLifecycleStrategy() { + @Override + public void afterConfigure(RouteBuilder builder) { + consumer.accept(builder); + } + }; + } } diff --git a/camel-k-runtime-cron/src/main/java/org/apache/camel/k/cron/CronSourceLoaderInterceptor.java b/camel-k-runtime-cron/src/main/java/org/apache/camel/k/cron/CronSourceLoaderInterceptor.java index 3c965ac..cdbf409 100644 --- a/camel-k-runtime-cron/src/main/java/org/apache/camel/k/cron/CronSourceLoaderInterceptor.java +++ b/camel-k-runtime-cron/src/main/java/org/apache/camel/k/cron/CronSourceLoaderInterceptor.java @@ -20,12 +20,13 @@ import java.util.Optional; import org.apache.camel.CamelContext; import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.RouteBuilder; import org.apache.camel.k.Runtime; import org.apache.camel.k.RuntimeAware; import org.apache.camel.k.Source; import org.apache.camel.k.SourceLoader; import org.apache.camel.k.annotation.LoaderInterceptor; -import org.apache.camel.k.support.RuntimeSupport; +import org.apache.camel.k.support.SourcesSupport; import org.apache.camel.model.RouteDefinition; import org.apache.camel.spi.CamelEvent; import org.apache.camel.spi.Configurer; @@ -84,33 +85,10 @@ public class CronSourceLoaderInterceptor implements SourceLoader.Interceptor, Ru return new SourceLoader.Result() { @Override public Optional<RoutesBuilder> builder() { - return RuntimeSupport.afterConfigure(result.builder(), builder -> { - if (ObjectHelper.isEmpty(overridableComponents)) { - return; - } - - final CamelContext context = runtime.getCamelContext(); - final String[] components = overridableComponents.split(",", -1); - - for (RouteDefinition def : builder.getRouteCollection().getRoutes()) { - String uri = def.getInput() != null ? def.getInput().getUri() : null; - if (shouldBeOverridden(uri, components)) { - def.getInput().setUri(timerUri); - - // - // Don't install the shutdown strategy more than once. - // - if (context.getManagementStrategy().getEventNotifiers().stream().noneMatch(CronShutdownStrategy.class::isInstance)) { - CronShutdownStrategy strategy = new CronShutdownStrategy(runtime); - ServiceHelper.startService(strategy); - - context.getManagementStrategy().addEventNotifier(strategy); - } - } - } - }); + return result.builder().map( + builder -> SourcesSupport.afterConfigure(builder, CronSourceLoaderInterceptor.this::afterConfigure) + ); } - @Override public Optional<Object> configuration() { return result.configuration(); @@ -118,6 +96,32 @@ public class CronSourceLoaderInterceptor implements SourceLoader.Interceptor, Ru }; } + private void afterConfigure(RouteBuilder builder) { + if (ObjectHelper.isEmpty(overridableComponents)) { + return; + } + + final CamelContext context = runtime.getCamelContext(); + final String[] components = overridableComponents.split(",", -1); + + for (RouteDefinition def : builder.getRouteCollection().getRoutes()) { + String uri = def.getInput() != null ? def.getInput().getUri() : null; + if (shouldBeOverridden(uri, components)) { + def.getInput().setUri(timerUri); + + // + // Don't install the shutdown strategy more than once. + // + if (context.getManagementStrategy().getEventNotifiers().stream().noneMatch(CronShutdownStrategy.class::isInstance)) { + CronShutdownStrategy strategy = new CronShutdownStrategy(runtime); + ServiceHelper.startService(strategy); + + context.getManagementStrategy().addEventNotifier(strategy); + } + } + } + } + private static boolean shouldBeOverridden(String uri, String... components) { if (uri == null) { return false; diff --git a/camel-k-runtime-knative/src/main/java/org/apache/camel/k/loader/knative/KnativeSourceLoaderInterceptor.java b/camel-k-runtime-knative/src/main/java/org/apache/camel/k/loader/knative/KnativeSourceLoaderInterceptor.java index 160399c..979fae7 100644 --- a/camel-k-runtime-knative/src/main/java/org/apache/camel/k/loader/knative/KnativeSourceLoaderInterceptor.java +++ b/camel-k-runtime-knative/src/main/java/org/apache/camel/k/loader/knative/KnativeSourceLoaderInterceptor.java @@ -21,10 +21,11 @@ import java.util.Optional; import org.apache.camel.CamelContext; import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.RouteBuilder; import org.apache.camel.k.Source; import org.apache.camel.k.SourceLoader; import org.apache.camel.k.annotation.LoaderInterceptor; -import org.apache.camel.k.support.RuntimeSupport; +import org.apache.camel.k.support.SourcesSupport; import org.apache.camel.model.RouteDefinition; import org.apache.camel.model.ToDefinition; import org.slf4j.Logger; @@ -44,24 +45,9 @@ public class KnativeSourceLoaderInterceptor implements SourceLoader.Interceptor return new SourceLoader.Result() { @Override public Optional<RoutesBuilder> builder() { - return RuntimeSupport.afterConfigure(result.builder(), builder -> { - final CamelContext camelContext = builder.getContext(); - final List<RouteDefinition> definitions = builder.getRouteCollection().getRoutes(); - - if (definitions.size() == 1) { - final String sinkName = camelContext.resolvePropertyPlaceholders("{{knative.sink:sink}}"); - final String sinkUri = String.format("knative://endpoint/%s", sinkName); - final RouteDefinition definition = definitions.get(0); - - LOGGER.info("Add sink:{} to route:{}", sinkUri, definition.getId()); - - // assuming that route is linear like there's no content based routing - // or ant other EIP that would branch the flow - definition.getOutputs().add(new ToDefinition(sinkUri)); - } else { - LOGGER.warn("Cannot determine route to enrich. the knative enpoint need to explicitly be defined"); - } - }); + return result.builder().map( + bulider -> SourcesSupport.afterConfigure(bulider, KnativeSourceLoaderInterceptor::afterConfigure) + ); } @Override @@ -71,4 +57,23 @@ public class KnativeSourceLoaderInterceptor implements SourceLoader.Interceptor }; } + private static void afterConfigure(RouteBuilder builder) { + final CamelContext camelContext = builder.getContext(); + final List<RouteDefinition> definitions = builder.getRouteCollection().getRoutes(); + + if (definitions.size() == 1) { + final String sinkName = camelContext.resolvePropertyPlaceholders("{{knative.sink:sink}}"); + final String sinkUri = String.format("knative://endpoint/%s", sinkName); + final RouteDefinition definition = definitions.get(0); + + LOGGER.info("Add sink:{} to route:{}", sinkUri, definition.getId()); + + // assuming that route is linear like there's no content based routing + // or ant other EIP that would branch the flow + definition.getOutputs().add(new ToDefinition(sinkUri)); + } else { + LOGGER.warn("Cannot determine route to enrich. the knative enpoint need to explicitly be defined"); + } + } + }
