This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel-examples.git
commit e97db54a6946ed28e0bca330a07d389e5d5ef798 Author: Otavio Rodolfo Piske <[email protected]> AuthorDate: Wed Aug 10 16:27:31 2022 +0200 (chores) camel-resume-api: cleaned up the example with clustering --- .../clusterized/main/ClusterizedListener.java | 22 ++++------------------ .../ClusterizedLargeDirectoryRouteBuilder.java | 2 +- 2 files changed, 5 insertions(+), 19 deletions(-) diff --git a/examples/resume-api/resume-api-fileset-clusterized/src/main/java/org/apache/camel/example/resume/fileset/clusterized/main/ClusterizedListener.java b/examples/resume-api/resume-api-fileset-clusterized/src/main/java/org/apache/camel/example/resume/fileset/clusterized/main/ClusterizedListener.java index 5d113d90..42471c14 100644 --- a/examples/resume-api/resume-api-fileset-clusterized/src/main/java/org/apache/camel/example/resume/fileset/clusterized/main/ClusterizedListener.java +++ b/examples/resume-api/resume-api-fileset-clusterized/src/main/java/org/apache/camel/example/resume/fileset/clusterized/main/ClusterizedListener.java @@ -20,10 +20,9 @@ import org.apache.camel.CamelContext; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.zookeeper.cluster.ZooKeeperClusterService; import org.apache.camel.example.resume.fileset.clusterized.strategies.ClusterizedLargeDirectoryRouteBuilder; +import org.apache.camel.example.resume.strategies.kafka.KafkaUtil; import org.apache.camel.main.BaseMainSupport; import org.apache.camel.main.MainListener; -import org.apache.camel.processor.resume.kafka.KafkaResumeStrategyConfiguration; -import org.apache.camel.processor.resume.kafka.KafkaResumeStrategyConfigurationBuilder; import org.apache.camel.processor.resume.kafka.SingleNodeKafkaResumeStrategy; import org.apache.camel.resume.Resumable; import org.apache.camel.resume.ResumeStrategy; @@ -55,7 +54,7 @@ class ClusterizedListener implements MainListener { main.getCamelContext().addService(clusterService); LOG.trace("Creating the strategy"); - SingleNodeKafkaResumeStrategy<Resumable> resumeStrategy = newResumeStrategy(); + SingleNodeKafkaResumeStrategy<Resumable> resumeStrategy = KafkaUtil.getDefaultStrategy(); main.getCamelContext().getRegistry().bind(ResumeStrategy.DEFAULT_NAME, resumeStrategy); LOG.trace("Creating the route"); @@ -89,20 +88,7 @@ class ClusterizedListener implements MainListener { @Override public void afterStop(BaseMainSupport main) { - main.shutdown(); - System.exit(0); - } - - private static SingleNodeKafkaResumeStrategy<Resumable> newResumeStrategy() { - String bootStrapAddress = System.getProperty("bootstrap.address", "localhost:9092"); - String kafkaTopic = System.getProperty("resume.type.kafka.topic", "offsets"); - - KafkaResumeStrategyConfiguration resumeStrategyConfiguration = - KafkaResumeStrategyConfigurationBuilder.newBuilder() - .withBootstrapServers(bootStrapAddress) - .withTopic(kafkaTopic) - .build(); - - return new SingleNodeKafkaResumeStrategy<>(resumeStrategyConfiguration); +// main.shutdown(); +// System.exit(0); } } diff --git a/examples/resume-api/resume-api-fileset-clusterized/src/main/java/org/apache/camel/example/resume/fileset/clusterized/strategies/ClusterizedLargeDirectoryRouteBuilder.java b/examples/resume-api/resume-api-fileset-clusterized/src/main/java/org/apache/camel/example/resume/fileset/clusterized/strategies/ClusterizedLargeDirectoryRouteBuilder.java index 842e0d85..d7340907 100644 --- a/examples/resume-api/resume-api-fileset-clusterized/src/main/java/org/apache/camel/example/resume/fileset/clusterized/strategies/ClusterizedLargeDirectoryRouteBuilder.java +++ b/examples/resume-api/resume-api-fileset-clusterized/src/main/java/org/apache/camel/example/resume/fileset/clusterized/strategies/ClusterizedLargeDirectoryRouteBuilder.java @@ -54,7 +54,7 @@ public class ClusterizedLargeDirectoryRouteBuilder extends RouteBuilder { .routeId("heartbeat") .log("HeartBeat route (timer) ..."); - from("master:resume-ns:file:{{input.dir}}?noop=true&recursive=true") + from("master:resume-ns:file:{{input.dir}}?noop=true&recursive=true&repeatCount=1") .resumable(ResumeStrategy.DEFAULT_NAME) .routeId("clustered") .process(this::process)
