This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit e8896bc3ab473086efd504d52542414b03a67b01 Author: Otavio Rodolfo Piske <[email protected]> AuthorDate: Thu Jun 16 17:26:26 2022 +0200 CAMEL-18128: allow determining the desirable resume cache fill policy --- .../kafka/SingleNodeKafkaResumeStrategy.java | 10 +++++++++ .../java/org/apache/camel/resume/Cacheable.java | 24 ++++++++++++++++++++++ 2 files changed, 34 insertions(+) diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java index 71d8ad26781..4de3f0acc7a 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java @@ -199,6 +199,16 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka * @throws Exception */ public void loadCache() throws Exception { + if (adapter instanceof Cacheable) { + Cacheable cacheable = (Cacheable) adapter; + + if (cacheable.getFillPolicy() == Cacheable.FillPolicy.MAXIMIZING) { + consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + } else { + consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); + } + } + createConsumer(); subscribe(); diff --git a/core/camel-api/src/main/java/org/apache/camel/resume/Cacheable.java b/core/camel-api/src/main/java/org/apache/camel/resume/Cacheable.java index 157d3689fe0..e11337c4464 100644 --- a/core/camel-api/src/main/java/org/apache/camel/resume/Cacheable.java +++ b/core/camel-api/src/main/java/org/apache/camel/resume/Cacheable.java @@ -24,6 +24,21 @@ import org.apache.camel.resume.cache.ResumeCache; */ public interface Cacheable { + /** + * The cache fill policy can be used to determine how this cache should be filled with data. + */ + enum FillPolicy { + /** + * With MAXIMIZING, entities try to maximize cache usage and fill it with as much data as possible + */ + MAXIMIZING, + + /** + * With MINIMIZING, entities should fill it with as little data as reasonable. + */ + MINIMIZING, + } + /** * Adds an offset key and value to the cache * @@ -46,4 +61,13 @@ public interface Cacheable { * @return A resume cache instance */ ResumeCache<?> getCache(); + + /** + * Gets the {@Link FillPolicy} for this cache instance + * + * @return the fill policy set for this instance FillPolicy.MAXIMIZING + */ + default FillPolicy getFillPolicy() { + return FillPolicy.MAXIMIZING; + } }
