This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new c05c928da1b CAMEL-18127: implement adapter auto-configuration in the
resume API
c05c928da1b is described below
commit c05c928da1b831c51c5d4617bbf81e0165b2bcf6
Author: Otavio Rodolfo Piske <[email protected]>
AuthorDate: Mon May 30 11:10:17 2022 +0200
CAMEL-18127: implement adapter auto-configuration in the resume API
---
.../org/apache/camel/resume/adapter.properties | 18 ++
.../resume/{single => }/CaffeineCache.java | 61 +++++--
.../caffeine/resume/multi/CaffeineCache.java | 98 -----------
.../integration/ConsumeResumeStrategyIT.java | 2 +-
.../couchdb/consumer/CouchDbResumable.java | 15 +-
.../apache/camel/component/file/FileConsumer.java | 28 +++-
.../apache/camel/component/file/GenericFile.java | 10 +-
...ble.java => DirectoryEntriesResumeAdapter.java} | 6 +-
...meAdapter.java => FileOffsetResumeAdapter.java} | 18 +-
.../component/file/consumer/FileResumeAdapter.java | 8 +-
.../adapters/AbstractFileResumeAdapter.java} | 37 +++--
.../DefaultDirectoryEntriesResumeAdapter.java | 75 +++++++++
.../adapters/DefaultFileOffsetResumeAdapter.java | 95 +++++++++++
.../adapters/DefaultFileSetResumeAdapter.java | 65 --------
.../adapters/DefaultGenericFileResumeAdapter.java | 64 --------
.../DirectoryEntries.java} | 33 ++--
.../file/consumer/adapters/FileOffset.java} | 31 ++--
.../adapters/FileResumeAdapterDelegate.java | 92 +++++++++++
.../FileSet.java} | 47 ++++--
.../org/apache/camel/resume/adapter.properties | 18 ++
.../kafka/consumer/support/KafkaResumable.java | 15 +-
.../resume/kafka/KafkaResumeStrategy.java | 5 +-
.../resume/kafka/MultiNodeKafkaResumeStrategy.java | 84 +++++-----
.../kafka/SingleNodeKafkaResumeStrategy.java | 159 +++++++++++-------
.../KafkaConsumerWithResumeRouteStrategyIT.java | 16 +-
.../resume/{ResumeStrategy.java => Cacheable.java} | 32 ++--
.../org/apache/camel/resume/Deserializable.java | 56 +++++++
.../main/java/org/apache/camel/resume/Offset.java | 14 +-
.../SingleEntryCache.java => OffsetKey.java} | 29 ++--
.../java/org/apache/camel/resume/Resumable.java | 21 +--
.../java/org/apache/camel/resume/ResumableSet.java | 81 ---------
.../org/apache/camel/resume/ResumeAdapter.java | 3 +
.../org/apache/camel/resume/ResumeStrategy.java | 15 ++
.../java/org/apache/camel/resume/Serializable.java | 82 ++++++++++
.../resume/UpdatableConsumerResumeStrategy.java | 4 +-
.../apache/camel/resume/cache/MultiEntryCache.java | 29 ----
.../org/apache/camel/resume/cache/ResumeCache.java | 46 +++++-
.../org/apache/camel/impl/engine/DefaultRoute.java | 6 +-
.../docs/modules/eips/pages/resume-strategies.adoc | 51 +-----
.../processor/resume/DelegatingResumeAdapter.java | 181 ---------------------
.../processor/resume/ResumableCompletion.java | 14 +-
.../processor/resume/TransientResumeStrategy.java | 10 ++
.../FileConsumerResumeFromOffsetStrategyTest.java | 53 ++++--
.../file/FileConsumerResumeStrategyTest.java | 36 ++--
.../apache/camel/support/resume/AdapterHelper.java | 112 +++++++++++++
.../apache/camel/support/resume/OffsetKeys.java | 109 +++++++++++++
.../org/apache/camel/support/resume/Offsets.java | 24 ++-
.../apache/camel/support/resume/Resumables.java | 59 ++++++-
48 files changed, 1279 insertions(+), 888 deletions(-)
diff --git
a/components/camel-atom/src/main/resources/org/apache/camel/resume/adapter.properties
b/components/camel-atom/src/main/resources/org/apache/camel/resume/adapter.properties
new file mode 100644
index 00000000000..3504fd3a39f
--- /dev/null
+++
b/components/camel-atom/src/main/resources/org/apache/camel/resume/adapter.properties
@@ -0,0 +1,18 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+adapterClass=org.apache.camel.component.atom.UpdatedDateFilter
diff --git
a/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/resume/single/CaffeineCache.java
b/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/resume/CaffeineCache.java
similarity index 58%
rename from
components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/resume/single/CaffeineCache.java
rename to
components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/resume/CaffeineCache.java
index d551418cf38..5ec66ddc88b 100644
---
a/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/resume/single/CaffeineCache.java
+++
b/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/resume/CaffeineCache.java
@@ -15,22 +15,23 @@
* limitations under the License.
*/
-package org.apache.camel.component.caffeine.resume.single;
+package org.apache.camel.component.caffeine.resume;
import java.util.Optional;
+import java.util.function.BiFunction;
+import java.util.function.Function;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
-import org.apache.camel.resume.cache.SingleEntryCache;
+import org.apache.camel.resume.cache.ResumeCache;
/**
* This is a simple cache implementation that uses Caffeine to store the
resume offsets
*
* @param <K> The type of the key to cache
- * @param <V> The type of the value/entry to cache
*/
-public class CaffeineCache<K, V> implements SingleEntryCache<K, V> {
- private final Cache<K, V> cache;
+public class CaffeineCache<K> implements ResumeCache<K> {
+ private final Cache<K, Object> cache;
private final long cacheSize;
/**
@@ -48,27 +49,27 @@ public class CaffeineCache<K, V> implements
SingleEntryCache<K, V> {
* @param cache an instance of a pre-constructed cache object
* @param cacheSize the size of the pre-constructed cache object
*/
- public CaffeineCache(Cache<K, V> cache, long cacheSize) {
+ public CaffeineCache(Cache<K, Object> cache, long cacheSize) {
this.cache = cache;
this.cacheSize = cacheSize;
}
@Override
- public boolean contains(K key, V entry) {
+ public boolean contains(K key, Object entry) {
assert key != null;
- V cachedEntry = cache.getIfPresent(key);
+ Object cachedEntry = cache.getIfPresent(key);
return entry.equals(cachedEntry);
}
@Override
- public void add(K key, V offsetValue) {
+ public void add(K key, Object offsetValue) {
cache.put(key, offsetValue);
}
@Override
- public Optional<V> get(K key) {
- V entry = cache.getIfPresent(key);
+ public Object get(K key) {
+ Object entry = cache.getIfPresent(key);
if (entry == null) {
return Optional.empty();
@@ -78,12 +79,42 @@ public class CaffeineCache<K, V> implements
SingleEntryCache<K, V> {
}
@Override
- public boolean isFull() {
- if (cache.estimatedSize() >= cacheSize) {
- return true;
+ public <T> T get(K key, Class<T> clazz) {
+ Object entry = cache.getIfPresent(key);
+ if (entry != null) {
+ return clazz.cast(entry);
+ }
+
+ return null;
+ }
+
+ @Override
+ public Object computeIfAbsent(K key, Function<? super K, ? super Object>
mapping) {
+ Object entry = cache.getIfPresent(key);
+
+ if (entry == null) {
+ entry = mapping.apply(key);
+ cache.put(key, entry);
+ }
+
+ return entry;
+ }
+
+ @Override
+ public Object computeIfPresent(K key, BiFunction<? super K, ? super
Object, ? super Object> remapping) {
+ Object entry = cache.getIfPresent(key);
+
+ if (entry != null) {
+ entry = remapping.apply(key, entry);
+ cache.put(key, entry);
}
- return false;
+ return entry;
+ }
+
+ @Override
+ public boolean isFull() {
+ return cache.estimatedSize() >= cacheSize;
}
@Override
diff --git
a/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/resume/multi/CaffeineCache.java
b/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/resume/multi/CaffeineCache.java
deleted file mode 100644
index e775be5d063..00000000000
---
a/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/resume/multi/CaffeineCache.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.component.caffeine.resume.multi;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import com.github.benmanes.caffeine.cache.Cache;
-import com.github.benmanes.caffeine.cache.Caffeine;
-import org.apache.camel.resume.cache.MultiEntryCache;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A cache that can store multiple key/valued resumables where the value is a
list containing multiple values
- *
- * @param <K> the type of the key
- * @param <V> the type of the value
- */
-public class CaffeineCache<K, V> implements MultiEntryCache<K, V> {
- private static final Logger LOG =
LoggerFactory.getLogger(CaffeineCache.class);
-
- private final Cache<K, List<V>> cache;
- private final long cacheSize;
-
- /**
- * Builds a new instance of this object with the given cache size
- *
- * @param cacheSize the size of the cache
- */
- public CaffeineCache(long cacheSize) {
- this(Caffeine.newBuilder().maximumSize(cacheSize).build(), cacheSize);
- }
-
- /**
- * Builds a new instance of this object
- *
- * @param cache an instance of a pre-constructed cache object
- * @param cacheSize the size of the pre-constructed cache object
- */
- public CaffeineCache(Cache<K, List<V>> cache, long cacheSize) {
- this.cache = cache;
- this.cacheSize = cacheSize;
- }
-
- @Override
- public long capacity() {
- return cacheSize;
- }
-
- @Override
- public synchronized void add(K key, V offsetValue) {
- LOG.trace("Adding entry to the cache (k/v): {}/{}", key, offsetValue);
- LOG.trace("Adding entry to the cache (k/v) with types: {}/{}",
key.getClass(), offsetValue.getClass());
- List<V> entries = cache.get(key, k -> new ArrayList<>());
-
- entries.add(offsetValue);
- }
-
- @Override
- public synchronized boolean contains(K key, V entry) {
- final List<V> entries = cache.getIfPresent(key);
-
- if (entries == null) {
- return false;
- }
-
- boolean ret = entries.contains(entry);
- LOG.trace("Checking if cache contains key {} with value {} ({})", key,
entry, ret);
-
- return ret;
- }
-
- @Override
- public boolean isFull() {
- if (cache.estimatedSize() >= cacheSize) {
- return true;
- }
-
- return false;
- }
-
-}
diff --git
a/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/integration/ConsumeResumeStrategyIT.java
b/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/integration/ConsumeResumeStrategyIT.java
index 8925c5459bd..cacab287d9d 100644
---
a/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/integration/ConsumeResumeStrategyIT.java
+++
b/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/integration/ConsumeResumeStrategyIT.java
@@ -50,7 +50,7 @@ public class ConsumeResumeStrategyIT extends
CouchbaseIntegrationTestBase {
}
}
- TransientResumeStrategy resumeStrategy = new TransientResumeStrategy(new
TestCouchbaseResumeAdapter());
+ private final TransientResumeStrategy resumeStrategy = new
TransientResumeStrategy(new TestCouchbaseResumeAdapter());
@Test
public void testQueryForBeers() throws Exception {
diff --git
a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumable.java
b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumable.java
index df8d2e10917..bd81936e55d 100644
---
a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumable.java
+++
b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumable.java
@@ -19,13 +19,15 @@ package org.apache.camel.component.couchdb.consumer;
import org.apache.camel.component.couchdb.CouchDbClientWrapper;
import org.apache.camel.resume.Offset;
+import org.apache.camel.resume.OffsetKey;
import org.apache.camel.resume.Resumable;
+import org.apache.camel.support.resume.OffsetKeys;
import org.apache.camel.support.resume.Offsets;
/**
* Wraps the resume data for CouchDb
*/
-public class CouchDbResumable implements Resumable<String, String> {
+public class CouchDbResumable implements Resumable {
private final CouchDbClientWrapper clientWrapper;
private String offset;
@@ -34,7 +36,6 @@ public class CouchDbResumable implements Resumable<String,
String> {
this.offset = offset;
}
- @Override
public void updateLastOffset(String offset) {
this.offset = offset;
}
@@ -44,11 +45,6 @@ public class CouchDbResumable implements Resumable<String,
String> {
return Offsets.of(offset);
}
- @Override
- public String getAddressable() {
- return null;
- }
-
/**
* Gets the client wrapper. Fine for local access, but should be
restricted for global access on the API
*
@@ -57,4 +53,9 @@ public class CouchDbResumable implements Resumable<String,
String> {
CouchDbClientWrapper getClientWrapper() {
return clientWrapper;
}
+
+ @Override
+ public OffsetKey<?> getOffsetKey() {
+ return OffsetKeys.empty();
+ }
}
diff --git
a/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java
b/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java
index 9d730a528a4..95f16969813 100644
---
a/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java
+++
b/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java
@@ -31,10 +31,9 @@ import java.util.Set;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
-import org.apache.camel.component.file.consumer.FileResumeAdapter;
-import org.apache.camel.component.file.consumer.FileResumeSet;
-import org.apache.camel.component.file.consumer.FileSetResumeAdapter;
-import org.apache.camel.component.file.consumer.GenericFileResumeAdapter;
+import org.apache.camel.component.file.consumer.DirectoryEntriesResumeAdapter;
+import org.apache.camel.component.file.consumer.FileOffsetResumeAdapter;
+import org.apache.camel.component.file.consumer.adapters.DirectoryEntries;
import org.apache.camel.resume.ResumeAdapter;
import org.apache.camel.resume.ResumeAware;
import org.apache.camel.resume.ResumeStrategy;
@@ -107,8 +106,9 @@ public class FileConsumer extends GenericFileConsumer<File>
implements ResumeAwa
if (resumeStrategy != null) {
ResumeAdapter adapter = resumeStrategy.getAdapter();
- if (adapter instanceof GenericFileResumeAdapter) {
- ((FileResumeAdapter) adapter).resume(gf);
+ if (adapter instanceof FileOffsetResumeAdapter) {
+ ((FileOffsetResumeAdapter) adapter).setResumePayload(gf);
+ adapter.resume();
}
}
@@ -178,10 +178,11 @@ public class FileConsumer extends
GenericFileConsumer<File> implements ResumeAwa
if (resumeStrategy != null) {
ResumeAdapter adapter = resumeStrategy.getAdapter();
- if (adapter instanceof FileSetResumeAdapter) {
- FileResumeSet resumeSet = new FileResumeSet(dirFiles);
+ if (adapter instanceof DirectoryEntriesResumeAdapter) {
+ DirectoryEntries resumeSet = new DirectoryEntries(directory,
dirFiles);
- ((FileResumeAdapter) adapter).resume(resumeSet);
+ ((DirectoryEntriesResumeAdapter)
adapter).setResumePayload(resumeSet);
+ adapter.resume();
return resumeSet.resumed();
}
@@ -315,6 +316,15 @@ public class FileConsumer extends
GenericFileConsumer<File> implements ResumeAwa
return
!file.getFile().getAbsolutePath().equals(file.getAbsoluteFilePath());
}
+ @Override
+ protected void doStart() throws Exception {
+ super.doStart();
+
+ if (resumeStrategy != null) {
+ resumeStrategy.loadCache();
+ }
+ }
+
@Override
public ResumeStrategy getResumeStrategy() {
return resumeStrategy;
diff --git
a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFile.java
b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFile.java
index 42d52d5cc2f..25249ac20b8 100644
---
a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFile.java
+++
b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFile.java
@@ -23,7 +23,6 @@ import java.util.Map;
import org.apache.camel.Exchange;
import org.apache.camel.WrappedFile;
-import org.apache.camel.component.file.consumer.GenericFileResumable;
import org.apache.camel.resume.Offset;
import org.apache.camel.support.resume.Offsets;
import org.apache.camel.util.FileUtil;
@@ -35,7 +34,7 @@ import org.slf4j.LoggerFactory;
/**
* Generic File. Specific implementations of a file based endpoint need to
provide a File for transfer.
*/
-public class GenericFile<T> implements WrappedFile<T>, GenericFileResumable<T>
{
+public class GenericFile<T> implements WrappedFile<T> {
private static final Logger LOG =
LoggerFactory.getLogger(GenericFile.class);
private final boolean probeContentType;
@@ -416,21 +415,14 @@ public class GenericFile<T> implements WrappedFile<T>,
GenericFileResumable<T> {
this.copyFromAbsoluteFilePath = copyFromAbsoluteFilePath;
}
- @Override
public void updateLastOffset(Long offset) {
this.lastOffset = offset;
}
- @Override
public Offset<Long> getLastOffset() {
return Offsets.of(lastOffset);
}
- @Override
- public T getAddressable() {
- return file;
- }
-
/**
* Fixes the path separator to be according to the protocol
*/
diff --git
a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/GenericFileResumable.java
b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/DirectoryEntriesResumeAdapter.java
similarity index 81%
rename from
components/camel-file/src/main/java/org/apache/camel/component/file/consumer/GenericFileResumable.java
rename to
components/camel-file/src/main/java/org/apache/camel/component/file/consumer/DirectoryEntriesResumeAdapter.java
index 79899514a54..9d1de1297dd 100644
---
a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/GenericFileResumable.java
+++
b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/DirectoryEntriesResumeAdapter.java
@@ -17,8 +17,10 @@
package org.apache.camel.component.file.consumer;
-import org.apache.camel.resume.Resumable;
+import org.apache.camel.component.file.consumer.adapters.DirectoryEntries;
-public interface GenericFileResumable<T> extends Resumable<T, Long> {
+public interface DirectoryEntriesResumeAdapter {
+ default void setResumePayload(DirectoryEntries fileSet) {
+ }
}
diff --git
a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileSetResumeAdapter.java
b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileOffsetResumeAdapter.java
similarity index 69%
rename from
components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileSetResumeAdapter.java
rename to
components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileOffsetResumeAdapter.java
index 2e1f5e56477..34d13264542 100644
---
a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileSetResumeAdapter.java
+++
b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileOffsetResumeAdapter.java
@@ -17,9 +17,21 @@
package org.apache.camel.component.file.consumer;
+import java.io.File;
+
+import org.apache.camel.component.file.GenericFile;
+
/**
- * Allows the implementation of file adapters for handling resume operations
for file sets (i.e.: file entries in a
- * directory)
+ * Provides an interface for adapters handling file offsets
*/
-public interface FileSetResumeAdapter extends FileResumeAdapter<FileResumeSet>
{
+public interface FileOffsetResumeAdapter {
+
+ /**
+ * Sets the resume payload used for the adapter
+ *
+ * @param genericFile a generic file instance
+ */
+ default void setResumePayload(GenericFile<File> genericFile) {
+
+ }
}
diff --git
a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileResumeAdapter.java
b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileResumeAdapter.java
index 25b807cbf40..0b2f9c113d5 100644
---
a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileResumeAdapter.java
+++
b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileResumeAdapter.java
@@ -22,12 +22,6 @@ import org.apache.camel.resume.ResumeAdapter;
/**
* Defines resume adapter for consumers of the file component.
*/
-public interface FileResumeAdapter<T> extends ResumeAdapter {
+public interface FileResumeAdapter extends ResumeAdapter {
- /**
- * Returns the last offset read for the given file.
- *
- * @param resumable the resumable file or resumable set to run the resume
- */
- void resume(T resumable);
}
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaResumable.java
b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/AbstractFileResumeAdapter.java
similarity index 50%
copy from
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaResumable.java
copy to
components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/AbstractFileResumeAdapter.java
index 064b721c8cf..0195a9325f1 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaResumable.java
+++
b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/AbstractFileResumeAdapter.java
@@ -15,33 +15,40 @@
* limitations under the License.
*/
-package org.apache.camel.component.kafka.consumer.support;
+package org.apache.camel.component.file.consumer.adapters;
+import java.io.File;
+
+import org.apache.camel.component.file.consumer.FileResumeAdapter;
+import org.apache.camel.resume.Cacheable;
import org.apache.camel.resume.Offset;
-import org.apache.camel.resume.Resumable;
-import org.apache.camel.support.resume.Offsets;
+import org.apache.camel.resume.OffsetKey;
+import org.apache.camel.resume.cache.ResumeCache;
-public class KafkaResumable implements Resumable<String, String> {
- private final String partition;
- private String offset;
+/**
+ * Base shared class for the file resume adapters
+ */
+abstract class AbstractFileResumeAdapter implements FileResumeAdapter,
Cacheable {
+ protected ResumeCache<File> cache;
- public KafkaResumable(String partition, String offset) {
- this.partition = partition;
- this.offset = offset;
+ protected AbstractFileResumeAdapter() {
}
+ @SuppressWarnings("unchecked")
@Override
- public void updateLastOffset(String offset) {
- this.offset = offset;
+ public final void setCache(ResumeCache<?> cache) {
+ this.cache = (ResumeCache<File>) cache;
}
@Override
- public Offset<String> getLastOffset() {
- return Offsets.of(offset);
+ public final ResumeCache<?> getCache() {
+ return cache;
}
@Override
- public String getAddressable() {
- return partition;
+ public final boolean add(OffsetKey<?> key, Offset<?> offset) {
+ return add(key.getKey(), offset.offset());
}
+
+ protected abstract boolean add(Object key, Object offset);
}
diff --git
a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/DefaultDirectoryEntriesResumeAdapter.java
b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/DefaultDirectoryEntriesResumeAdapter.java
new file mode 100644
index 00000000000..09e33457adb
--- /dev/null
+++
b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/DefaultDirectoryEntriesResumeAdapter.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.file.consumer.adapters;
+
+import java.io.File;
+
+import org.apache.camel.component.file.consumer.DirectoryEntriesResumeAdapter;
+import org.apache.camel.component.file.consumer.FileResumeAdapter;
+
+/**
+ * An implementation of the {@link FileResumeAdapter} that can be used for
resume operations for the file component.
+ * This one can be used to manage the resume operations for files within a
directory.
+ */
+class DefaultDirectoryEntriesResumeAdapter extends AbstractFileResumeAdapter
implements DirectoryEntriesResumeAdapter {
+ private DirectoryEntries fileSet;
+
+ private boolean notProcessed(File directory, File file) {
+ FileSet cached = cache.get(directory, FileSet.class);
+ if (cached == null) {
+ return true;
+ }
+
+ return !cached.contains(file);
+ }
+
+ @Override
+ public void setResumePayload(DirectoryEntries fileSet) {
+ assert fileSet != null;
+
+ this.fileSet = fileSet;
+ }
+
+ protected boolean add(Object key, Object offset) {
+ if (offset instanceof File) {
+ FileSet fileSet = (FileSet) cache.computeIfAbsent((File) key, k ->
new FileSet());
+
+ fileSet.update((File) offset);
+ } else {
+ throw new UnsupportedOperationException("This adapter cannot be
used for file offsets");
+ }
+
+ // For this one it's safe to always continue processing
+ return true;
+ }
+
+ private void resumeDirectoryEntries() {
+ DirectoryEntries.doResume(fileSet, f ->
notProcessed(fileSet.getDirectory(), f));
+ }
+
+ @Override
+ public void resume() {
+ resumeDirectoryEntries();
+ }
+
+ public void deserializeFileEntry(File keyObj, File valueObj) {
+ FileSet fileSet = (FileSet) cache.computeIfAbsent(keyObj, obj -> new
FileSet());
+
+ fileSet.update(valueObj);
+ }
+}
diff --git
a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/DefaultFileOffsetResumeAdapter.java
b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/DefaultFileOffsetResumeAdapter.java
new file mode 100644
index 00000000000..51ac52f7933
--- /dev/null
+++
b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/DefaultFileOffsetResumeAdapter.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.file.consumer.adapters;
+
+import java.io.File;
+
+import org.apache.camel.component.file.GenericFile;
+import org.apache.camel.component.file.consumer.FileOffsetResumeAdapter;
+import org.apache.camel.component.file.consumer.FileResumeAdapter;
+import org.apache.camel.resume.Offset;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An implementation of the {@link FileResumeAdapter} that can be used for
resume operations for the file component.
+ * This can be used to manage the resume operations for a single file using
its offset.
+ */
+class DefaultFileOffsetResumeAdapter extends AbstractFileResumeAdapter
implements FileOffsetResumeAdapter {
+ private static final Logger LOG =
LoggerFactory.getLogger(DefaultFileOffsetResumeAdapter.class);
+
+ private GenericFile<File> genericFile;
+
+ private Offset<?> getLastOffset(File addressable) {
+ return cache.get(addressable, Offset.class);
+ }
+
+ @Override
+ public void setResumePayload(GenericFile<File> genericFile) {
+ assert genericFile != null;
+ this.genericFile = genericFile;
+ }
+
+ public boolean add(Object key, Object offset) {
+ if (offset instanceof Long) {
+ FileOffset fileOffset = (FileOffset) cache.computeIfAbsent((File)
key, k -> new FileOffset());
+
+ fileOffset.update((Long) offset);
+ } else {
+ throw new UnsupportedOperationException("This adapter cannot be
used for directory entries");
+ }
+
+ // For this one it's safe to always continue processing
+ return true;
+ }
+
+ private void resumeFileOffsets() {
+ if (genericFile == null) {
+ return;
+ }
+
+ final Offset<?> lastOffset = getLastOffset(genericFile.getFile());
+
+ if (lastOffset == null) {
+ return;
+ }
+
+ Object offsetObj = lastOffset.offset();
+ if (offsetObj == null) {
+ return;
+ }
+
+ if (offsetObj instanceof Long) {
+ genericFile.updateLastOffset((Long) offsetObj);
+ } else {
+ // This should never happen
+ LOG.warn("Cannot perform a resume operation of an object of
unhandled type: {}", offsetObj.getClass());
+ }
+ }
+
+ @Override
+ public void resume() {
+ resumeFileOffsets();
+ }
+
+ public void deserializeFileOffset(File keyObj, Long valueObj) {
+ FileOffset longOffset = (FileOffset) cache.computeIfAbsent(keyObj, obj
-> new FileOffset());
+
+ longOffset.update(valueObj);
+ }
+}
diff --git
a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/DefaultFileSetResumeAdapter.java
b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/DefaultFileSetResumeAdapter.java
deleted file mode 100644
index 9a1f7f49f76..00000000000
---
a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/DefaultFileSetResumeAdapter.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.component.file.consumer.adapters;
-
-import java.io.File;
-
-import org.apache.camel.component.file.consumer.FileResumeSet;
-import org.apache.camel.component.file.consumer.FileSetResumeAdapter;
-import org.apache.camel.resume.cache.MultiEntryCache;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * An implementation of the {@link FileSetResumeAdapter} that can be used for
resume operations for multiple files. For
- * instance, this can be used to manage the resume operations for files within
a directory.
- */
-public class DefaultFileSetResumeAdapter implements FileSetResumeAdapter {
- private static final Logger LOG =
LoggerFactory.getLogger(DefaultFileSetResumeAdapter.class);
-
- private final MultiEntryCache<File, File> cache;
-
- public DefaultFileSetResumeAdapter(MultiEntryCache<File, File> cache) {
- this.cache = cache;
- }
-
- private boolean notProcessed(File file) {
- File key = file.getParentFile();
-
- // if the file is in the cache, then it's already processed
- boolean ret = !cache.contains(key, file);
- return ret;
- }
-
- @Override
- public void resume(FileResumeSet resumable) {
- if (resumable != null) {
- resumable.resumeEach(this::notProcessed);
- if (resumable.hasResumables()) {
- LOG.debug("There's {} files to still to be processed",
resumable.resumed().length);
- }
- } else {
- LOG.trace("Nothing to resume");
- }
- }
-
- @Override
- public void resume() {
- // NO-OP
- }
-}
diff --git
a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/DefaultGenericFileResumeAdapter.java
b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/DefaultGenericFileResumeAdapter.java
deleted file mode 100644
index 54e339fe73e..00000000000
---
a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/DefaultGenericFileResumeAdapter.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.component.file.consumer.adapters;
-
-import java.io.File;
-import java.util.Optional;
-
-import org.apache.camel.component.file.consumer.GenericFileResumable;
-import org.apache.camel.component.file.consumer.GenericFileResumeAdapter;
-import org.apache.camel.resume.cache.SingleEntryCache;
-
-/**
- * An implementation of the {@link GenericFileResumeAdapter} that can be used
to handle resume operations for file
- * offsets (where the offsets are of Long format).
- */
-public class DefaultGenericFileResumeAdapter implements
GenericFileResumeAdapter {
- private final SingleEntryCache<File, Long> cache;
-
- public DefaultGenericFileResumeAdapter(SingleEntryCache<File, Long> cache)
{
- this.cache = cache;
- }
-
- private Optional<Long> getLastOffset(GenericFileResumable<File> resumable)
{
- final File addressable = resumable.getAddressable();
- return cache.get(addressable);
- }
-
- @Override
- public Optional<Long> getLastOffset(File addressable) {
- return cache.get(addressable);
- }
-
- @Override
- public void resume(GenericFileResumable<File> resumable) {
- final Optional<Long> lastOffsetOpt = getLastOffset(resumable);
-
- if (!lastOffsetOpt.isPresent()) {
- return;
- }
-
- final long lastOffset = lastOffsetOpt.get();
- resumable.updateLastOffset(lastOffset);
- }
-
- @Override
- public void resume() {
-
- }
-}
diff --git
a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileResumeSet.java
b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/DirectoryEntries.java
similarity index 70%
rename from
components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileResumeSet.java
rename to
components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/DirectoryEntries.java
index 41b1751223c..5795dc5042e 100644
---
a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileResumeSet.java
+++
b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/DirectoryEntries.java
@@ -15,35 +15,28 @@
* limitations under the License.
*/
-package org.apache.camel.component.file.consumer;
+package org.apache.camel.component.file.consumer.adapters;
import java.io.File;
import java.util.Objects;
import java.util.function.Predicate;
-import org.apache.camel.resume.ResumableSet;
+import org.apache.camel.support.resume.Resumables;
/**
* This contains the input/output file set for resume operations.
*/
-public final class FileResumeSet implements ResumableSet<File> {
+public final class DirectoryEntries {
+ private final File directory;
private final File[] inputFiles;
private File[] outputFiles;
- public FileResumeSet(File[] inputFiles) {
+ public DirectoryEntries(File directory, File[] inputFiles) {
+ this.directory = directory;
this.inputFiles = Objects.requireNonNull(inputFiles,
"A list of input files must be provided for the resume info");
}
- /**
- * Iterates over the set of input files checking if they should be resumed
or not
- *
- * @param resumableCheck a checker method that returns true if the file
should be resumed or false otherwise
- */
- public void resumeEach(Predicate<File> resumableCheck) {
- this.outputFiles = resumeEach(inputFiles, resumableCheck);
- }
-
/**
* Gets the files that should be resumed
*
@@ -65,4 +58,18 @@ public final class FileResumeSet implements
ResumableSet<File> {
return false;
}
+
+ public File getDirectory() {
+ return directory;
+ }
+
+ public void setOutputFiles(File[] resumed) {
+ this.outputFiles = resumed;
+ }
+
+ public static void doResume(DirectoryEntries directoryEntries,
Predicate<File> resumableCheck) {
+ File[] processed = Resumables.resumeEach(directoryEntries.inputFiles,
+ resumableCheck);
+ directoryEntries.setOutputFiles(processed);
+ }
}
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaOffset.java
b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/FileOffset.java
similarity index 63%
rename from
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaOffset.java
rename to
components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/FileOffset.java
index 11a0879a59c..26ec8477d64 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaOffset.java
+++
b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/FileOffset.java
@@ -15,25 +15,32 @@
* limitations under the License.
*/
-package org.apache.camel.component.kafka.consumer.support;
+package org.apache.camel.component.file.consumer.adapters;
import org.apache.camel.resume.Offset;
-import org.apache.camel.util.KeyValueHolder;
-/**
- * Offset class for Kafka
- */
-public class KafkaOffset implements Offset<KeyValueHolder<String, String>> {
- private final String topicPartition;
- private final String offset;
+public class FileOffset implements Offset<Long> {
+ private Long offset;
+
+ public FileOffset() {
+ }
- public KafkaOffset(String topicPartition, String offset) {
- this.topicPartition = topicPartition;
+ public FileOffset(Long offset) {
this.offset = offset;
}
@Override
- public KeyValueHolder<String, String> offset() {
- return new KeyValueHolder<>(topicPartition, offset);
+ public void update(Long offset) {
+ this.offset = offset;
+ }
+
+ @Deprecated
+ public Object getLastOffset() {
+ return offset;
+ }
+
+ @Override
+ public Long offset() {
+ return offset;
}
}
diff --git
a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/FileResumeAdapterDelegate.java
b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/FileResumeAdapterDelegate.java
new file mode 100644
index 00000000000..f75e51d199b
--- /dev/null
+++
b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/FileResumeAdapterDelegate.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.file.consumer.adapters;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+
+import org.apache.camel.component.file.GenericFile;
+import org.apache.camel.component.file.consumer.DirectoryEntriesResumeAdapter;
+import org.apache.camel.component.file.consumer.FileOffsetResumeAdapter;
+import org.apache.camel.component.file.consumer.FileResumeAdapter;
+import org.apache.camel.resume.Cacheable;
+import org.apache.camel.resume.Deserializable;
+import org.apache.camel.resume.Offset;
+import org.apache.camel.resume.OffsetKey;
+import org.apache.camel.resume.cache.ResumeCache;
+
+public class FileResumeAdapterDelegate
+ implements FileResumeAdapter, Cacheable, Deserializable,
FileOffsetResumeAdapter, DirectoryEntriesResumeAdapter {
+ private final DefaultDirectoryEntriesResumeAdapter
directoryEntriesResumeAdapter
+ = new DefaultDirectoryEntriesResumeAdapter();
+ private final DefaultFileOffsetResumeAdapter fileOffsetResumeAdapter = new
DefaultFileOffsetResumeAdapter();
+
+ @Override
+ public void setResumePayload(GenericFile<File> genericFile) {
+ fileOffsetResumeAdapter.setResumePayload(genericFile);
+ }
+
+ @Override
+ public void setResumePayload(DirectoryEntries fileSet) {
+ directoryEntriesResumeAdapter.setResumePayload(fileSet);
+ }
+
+ @Override
+ public boolean add(OffsetKey<?> key, Offset<?> offset) {
+ Object offsetObj = offset.offset();
+
+ if (offsetObj instanceof Long) {
+ return fileOffsetResumeAdapter.add(key, offset);
+ } else {
+ return directoryEntriesResumeAdapter.add(key, offset);
+ }
+ }
+
+ @Override
+ public void setCache(ResumeCache<?> cache) {
+ fileOffsetResumeAdapter.setCache(cache);
+ directoryEntriesResumeAdapter.setCache(cache);
+ }
+
+ @Override
+ public ResumeCache<?> getCache() {
+ return fileOffsetResumeAdapter.getCache();
+ }
+
+ @Override
+ public boolean deserialize(ByteBuffer keyBuffer, ByteBuffer valueBuffer) {
+ Object keyObj = deserializeObject(keyBuffer);
+ Object valueObj = deserializeObject(valueBuffer);
+
+ if (valueObj instanceof File) {
+ directoryEntriesResumeAdapter.deserializeFileEntry((File) keyObj,
(File) valueObj);
+ }
+
+ if (valueObj instanceof Long) {
+ fileOffsetResumeAdapter.deserializeFileOffset((File) keyObj,
(Long) valueObj);
+ }
+
+ return true;
+ }
+
+ @Override
+ public void resume() {
+ fileOffsetResumeAdapter.resume();
+ directoryEntriesResumeAdapter.resume();
+ }
+}
diff --git
a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/GenericFileResumeAdapter.java
b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/FileSet.java
similarity index 55%
rename from
components/camel-file/src/main/java/org/apache/camel/component/file/consumer/GenericFileResumeAdapter.java
rename to
components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/FileSet.java
index 8f7c1f2900c..e4502fc7c35 100644
---
a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/GenericFileResumeAdapter.java
+++
b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/FileSet.java
@@ -15,20 +15,41 @@
* limitations under the License.
*/
-package org.apache.camel.component.file.consumer;
+package org.apache.camel.component.file.consumer.adapters;
import java.io.File;
-import java.util.Optional;
+import java.util.ArrayList;
+import java.util.List;
-/**
- * Allows the implementation of file adapters for handling resume operations
for generic files
- */
-public interface GenericFileResumeAdapter extends
FileResumeAdapter<GenericFileResumable<File>> {
- /**
- * Gets the last offset for the given file
- *
- * @param addressable the file instance
- * @return An Optional with the offset value
- */
- Optional<Long> getLastOffset(File addressable);
+import org.apache.camel.resume.Offset;
+
+public class FileSet implements Offset<File> {
+ private final List<File> files = new ArrayList<>();
+
+ public FileSet() {
+
+ }
+
+ public FileSet(File offset) {
+ files.add(offset);
+ }
+
+ @Deprecated
+ public Object getLastOffset() {
+ return files;
+ }
+
+ public boolean contains(Object o) {
+ return files.contains(o);
+ }
+
+ @Override
+ public void update(File offset) {
+ files.add(offset);
+ }
+
+ @Override
+ public File offset() {
+ return null;
+ }
}
diff --git
a/components/camel-file/src/main/resources/org/apache/camel/resume/adapter.properties
b/components/camel-file/src/main/resources/org/apache/camel/resume/adapter.properties
new file mode 100644
index 00000000000..bb7eec9fbd3
--- /dev/null
+++
b/components/camel-file/src/main/resources/org/apache/camel/resume/adapter.properties
@@ -0,0 +1,18 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+adapterClass=org.apache.camel.component.file.consumer.adapters.FileResumeAdapterDelegate
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaResumable.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaResumable.java
index 064b721c8cf..c49d3e085bd 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaResumable.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaResumable.java
@@ -18,30 +18,27 @@
package org.apache.camel.component.kafka.consumer.support;
import org.apache.camel.resume.Offset;
+import org.apache.camel.resume.OffsetKey;
import org.apache.camel.resume.Resumable;
+import org.apache.camel.support.resume.OffsetKeys;
import org.apache.camel.support.resume.Offsets;
-public class KafkaResumable implements Resumable<String, String> {
+public class KafkaResumable implements Resumable {
private final String partition;
- private String offset;
+ private final String offset;
public KafkaResumable(String partition, String offset) {
this.partition = partition;
this.offset = offset;
}
- @Override
- public void updateLastOffset(String offset) {
- this.offset = offset;
- }
-
@Override
public Offset<String> getLastOffset() {
return Offsets.of(offset);
}
@Override
- public String getAddressable() {
- return partition;
+ public OffsetKey<?> getOffsetKey() {
+ return OffsetKeys.unmodifiableOf(partition);
}
}
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategy.java
b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategy.java
index a1614d4bfc2..9951e09303b 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategy.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategy.java
@@ -24,9 +24,8 @@ import
org.apache.camel.resume.UpdatableConsumerResumeStrategy;
/**
* Base interface for resume strategies that publish the offsets to a Kafka
topic
*
- * @param <K> the type of key
- * @param <V> the type of the value
+ * @param <T> the type of resumable
*/
-public interface KafkaResumeStrategy<K, V> extends
UpdatableConsumerResumeStrategy<K, V, Resumable<K, V>>, ResumeStrategy {
+public interface KafkaResumeStrategy<T extends Resumable> extends
UpdatableConsumerResumeStrategy<T>, ResumeStrategy {
}
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/MultiNodeKafkaResumeStrategy.java
b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/MultiNodeKafkaResumeStrategy.java
index e9974cac4e8..7ece1b23699 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/MultiNodeKafkaResumeStrategy.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/MultiNodeKafkaResumeStrategy.java
@@ -17,16 +17,19 @@
package org.apache.camel.processor.resume.kafka;
+import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import org.apache.camel.resume.ResumeAdapter;
-import org.apache.camel.resume.cache.ResumeCache;
+import org.apache.camel.resume.Deserializable;
+import org.apache.camel.resume.Resumable;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,9 +39,8 @@ import org.slf4j.LoggerFactory;
* integrations. This is suitable, for instance, when using clusters with the
master component.
*
* @param <K> the type of key
- * @param <V> the type of the value
*/
-public class MultiNodeKafkaResumeStrategy<K, V> extends
SingleNodeKafkaResumeStrategy<K, V> {
+public class MultiNodeKafkaResumeStrategy<K extends Resumable> extends
SingleNodeKafkaResumeStrategy<K> {
private static final Logger LOG =
LoggerFactory.getLogger(MultiNodeKafkaResumeStrategy.class);
private final ExecutorService executorService;
@@ -47,29 +49,23 @@ public class MultiNodeKafkaResumeStrategy<K, V> extends
SingleNodeKafkaResumeStr
*
* @param bootstrapServers the address of the Kafka broker
* @param topic the topic where to publish the offsets
- * @param resumeCache a cache instance where to store the offsets
locally for faster access
- * @param resumeAdapter the component-specific resume adapter
*/
- public MultiNodeKafkaResumeStrategy(String bootstrapServers, String topic,
ResumeCache<K, V> resumeCache,
- ResumeAdapter resumeAdapter) {
+ public MultiNodeKafkaResumeStrategy(String bootstrapServers, String topic)
{
// just in case users don't want to provide their own worker thread
pool
- this(bootstrapServers, topic, resumeCache, resumeAdapter,
Executors.newSingleThreadExecutor());
+ this(bootstrapServers, topic, Executors.newSingleThreadExecutor());
}
/**
* Builds an instance of this class
*
- * @param bootstrapServers
+ * @param bootstrapServers the address of the Kafka broker
* @param topic the topic where to publish the offsets
- * @param resumeCache a cache instance where to store the offsets
locally for faster access
- * @param resumeAdapter the component-specific resume adapter
* @param executorService an executor service that will run a separate
thread for periodically refreshing the
* offsets
*/
- public MultiNodeKafkaResumeStrategy(String bootstrapServers, String topic,
ResumeCache<K, V> resumeCache,
- ResumeAdapter resumeAdapter,
ExecutorService executorService) {
- super(bootstrapServers, topic, resumeCache, resumeAdapter);
+ public MultiNodeKafkaResumeStrategy(String bootstrapServers, String topic,
ExecutorService executorService) {
+ super(bootstrapServers, topic);
// We need to keep refreshing the cache
this.executorService = executorService;
@@ -80,36 +76,56 @@ public class MultiNodeKafkaResumeStrategy<K, V> extends
SingleNodeKafkaResumeStr
* Builds an instance of this class
*
* @param topic the topic where to publish the offsets
- * @param resumeCache a cache instance where to store the offsets
locally for faster access
- * @param resumeAdapter the component-specific resume adapter
* @param producerConfig the set of properties to be used by the Kafka
producer within this class
* @param consumerConfig the set of properties to be used by the Kafka
consumer within this class
*/
- public MultiNodeKafkaResumeStrategy(String topic, ResumeCache<K, V>
resumeCache, ResumeAdapter resumeAdapter,
- Properties producerConfig, Properties
consumerConfig) {
- this(topic, resumeCache, resumeAdapter, producerConfig,
consumerConfig, Executors.newSingleThreadExecutor());
+ public MultiNodeKafkaResumeStrategy(String topic, Properties
producerConfig, Properties consumerConfig) {
+ this(topic, producerConfig, consumerConfig,
Executors.newSingleThreadExecutor());
}
/**
* Builds an instance of this class
*
* @param topic the topic where to publish the offsets
- * @param resumeCache a cache instance where to store the offsets
locally for faster access
- * @param resumeAdapter the component-specific resume adapter
* @param producerConfig the set of properties to be used by the Kafka
producer within this class
* @param consumerConfig the set of properties to be used by the Kafka
consumer within this class
* @param executorService an executor service that will run a separate
thread for periodically refreshing the
* offsets
*/
- public MultiNodeKafkaResumeStrategy(String topic, ResumeCache<K, V>
resumeCache, ResumeAdapter resumeAdapter,
- Properties producerConfig, Properties
consumerConfig, ExecutorService executorService) {
- super(topic, resumeCache, resumeAdapter, producerConfig,
consumerConfig);
+ public MultiNodeKafkaResumeStrategy(String topic, Properties
producerConfig, Properties consumerConfig,
+ ExecutorService executorService) {
+ super(topic, producerConfig, consumerConfig);
this.executorService = executorService;
executorService.submit(() -> refresh());
}
+ protected void poll() {
+ poll(getConsumer());
+ }
+
+ protected void poll(Consumer<byte[], byte[]> consumer) {
+ Deserializable deserializable = (Deserializable) getAdapter();
+
+ ConsumerRecords<byte[], byte[]> records;
+ do {
+ records = consume(10, consumer);
+
+ if (records.isEmpty()) {
+ break;
+ }
+
+ for (ConsumerRecord<byte[], byte[]> record : records) {
+ byte[] value = record.value();
+
+ LOG.trace("Read from Kafka: {}", value);
+
+ deserializable.deserialize(ByteBuffer.wrap(record.key()),
ByteBuffer.wrap(record.value()));
+ }
+ } while (true);
+ }
+
/**
* Launch a thread to refresh the offsets periodically
*/
@@ -119,22 +135,10 @@ public class MultiNodeKafkaResumeStrategy<K, V> extends
SingleNodeKafkaResumeStr
Properties prop = (Properties) getConsumerConfig().clone();
prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
UUID.randomUUID().toString());
- Consumer<K, V> consumer = new KafkaConsumer<>(prop);
-
- consumer.subscribe(Collections.singletonList(getTopic()));
-
- while (true) {
- var records = consumer.poll(getPollDuration());
- if (records.isEmpty()) {
- continue;
- }
-
- for (var record : records) {
- V value = record.value();
+ try (Consumer<byte[], byte[]> consumer = new
KafkaConsumer<>(prop)) {
+ consumer.subscribe(Collections.singletonList(getTopic()));
- LOG.trace("Read from Kafka: {}", value);
- getResumeCache().add(record.key(), record.value());
- }
+ poll(consumer);
}
} catch (Exception e) {
LOG.error("Error while refreshing the local cache: {}",
e.getMessage(), e);
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
index 0a14bb481f6..0274fbd2e30 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
@@ -18,6 +18,7 @@
package org.apache.camel.processor.resume.kafka;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
@@ -28,6 +29,11 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.resume.Cacheable;
+import org.apache.camel.resume.Deserializable;
+import org.apache.camel.resume.Offset;
+import org.apache.camel.resume.OffsetKey;
import org.apache.camel.resume.Resumable;
import org.apache.camel.resume.ResumeAdapter;
import org.apache.camel.resume.cache.ResumeCache;
@@ -45,8 +51,8 @@ import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,54 +60,45 @@ import org.slf4j.LoggerFactory;
* A resume strategy that publishes offsets to a Kafka topic. This resume
strategy is suitable for single node
* integrations. For multi-node integrations (i.e: using clusters with the
master component check
* {@link MultiNodeKafkaResumeStrategy}.
- *
- * @param <K> the type of key
- * @param <V> the type of the value
+ *
*/
-public class SingleNodeKafkaResumeStrategy<K, V> implements
KafkaResumeStrategy<K, V> {
+public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements
KafkaResumeStrategy<T> {
private static final Logger LOG =
LoggerFactory.getLogger(SingleNodeKafkaResumeStrategy.class);
private final String topic;
- private Consumer<K, V> consumer;
- private Producer<K, V> producer;
+ private Consumer<byte[], byte[]> consumer;
+ private Producer<byte[], byte[]> producer;
private Duration pollDuration = Duration.ofSeconds(1);
private final Queue<RecordError> producerErrors = new
ConcurrentLinkedQueue<>();
- private final ResumeCache<K, V> resumeCache;
+
private boolean subscribed;
private final Properties producerConfig;
private final Properties consumerConfig;
- private ResumeAdapter resumeAdapter;
+ private ResumeAdapter adapter;
/**
* Builds an instance of this class
*
* @param bootstrapServers the address of the Kafka broker
* @param topic the topic where to publish the offsets
- * @param resumeCache a cache instance where to store the offsets
locally for faster access
- * @param resumeAdapter the component-specific resume adapter
+ *
*/
- public SingleNodeKafkaResumeStrategy(String bootstrapServers, String
topic, ResumeCache<K, V> resumeCache,
- ResumeAdapter resumeAdapter) {
- this(topic, resumeCache, resumeAdapter,
createProducer(bootstrapServers), createConsumer(bootstrapServers));
+ public SingleNodeKafkaResumeStrategy(String bootstrapServers, String
topic) {
+ this(topic, createProducer(bootstrapServers),
createConsumer(bootstrapServers));
}
/**
* Builds an instance of this class
*
* @param topic the topic where to publish the offsets
- * @param resumeCache a cache instance where to store the offsets
locally for faster access
- * @param resumeAdapter the component-specific resume adapter
* @param producerConfig the set of properties to be used by the Kafka
producer within this class
* @param consumerConfig the set of properties to be used by the Kafka
consumer within this class
*/
- public SingleNodeKafkaResumeStrategy(String topic, ResumeCache<K, V>
resumeCache, ResumeAdapter resumeAdapter,
- Properties producerConfig,
+ public SingleNodeKafkaResumeStrategy(String topic, Properties
producerConfig,
Properties consumerConfig) {
this.topic = ObjectHelper.notNull(topic, "The topic must not be null");
- this.resumeCache = resumeCache;
- this.resumeAdapter = resumeAdapter;
this.producerConfig = producerConfig;
this.consumerConfig = consumerConfig;
@@ -117,8 +114,8 @@ public class SingleNodeKafkaResumeStrategy<K, V> implements
KafkaResumeStrategy<
public static Properties createProducer(String bootstrapServers) {
Properties config = new Properties();
- config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
- config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
+ config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class.getName());
+ config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class.getName());
StringHelper.notEmpty(bootstrapServers, "bootstrapServers");
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
@@ -135,8 +132,8 @@ public class SingleNodeKafkaResumeStrategy<K, V> implements
KafkaResumeStrategy<
public static Properties createConsumer(String bootstrapServers) {
Properties config = new Properties();
- config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
- config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
+ config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName());
+ config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName());
StringHelper.notEmpty(bootstrapServers, "bootstrapServers");
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
@@ -160,8 +157,8 @@ public class SingleNodeKafkaResumeStrategy<K, V> implements
KafkaResumeStrategy<
* @throws InterruptedException
*
*/
- protected void produce(K key, V message) throws ExecutionException,
InterruptedException {
- ProducerRecord<K, V> record = new ProducerRecord<>(topic, key,
message);
+ protected void produce(byte[] key, byte[] message) throws
ExecutionException, InterruptedException {
+ ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic,
key, message);
producer.send(record, (recordMetadata, e) -> {
if (e != null) {
@@ -171,16 +168,27 @@ public class SingleNodeKafkaResumeStrategy<K, V>
implements KafkaResumeStrategy<
});
}
+ protected void doAdd(OffsetKey<?> key, Offset<?> offsetValue) {
+ if (adapter instanceof Cacheable) {
+ Cacheable cacheable = (Cacheable) adapter;
+
+ cacheable.add(key, offsetValue);
+ }
+ }
+
@Override
- public void updateLastOffset(Resumable<K, V> offset) throws Exception {
- K key = offset.getAddressable();
- V offsetValue = offset.getLastOffset().offset();
+ public void updateLastOffset(T offset) throws Exception {
+ OffsetKey<?> key = offset.getOffsetKey();
+ Offset<?> offsetValue = offset.getLastOffset();
+
+ LOG.debug("Updating offset on Kafka with key {} to {}", key.getKey(),
offsetValue.offset());
- LOG.debug("Updating offset on Kafka with key {} to {}", key,
offsetValue);
+ ByteBuffer keyBuffer = key.serialize();
+ ByteBuffer valueBuffer = offsetValue.serialize();
- produce(key, offsetValue);
+ produce(keyBuffer.array(), valueBuffer.array());
- resumeCache.add(key, offsetValue);
+ doAdd(key, offsetValue);
}
/**
@@ -188,12 +196,23 @@ public class SingleNodeKafkaResumeStrategy<K, V>
implements KafkaResumeStrategy<
*
* @throws Exception
*/
- protected void loadCache() throws Exception {
+ public void loadCache() throws Exception {
subscribe();
LOG.debug("Loading records from topic {}", topic);
- ConsumerRecords<K, V> records;
+ if (!(adapter instanceof Deserializable)) {
+ throw new RuntimeCamelException("Cannot load data for an adapter
that is not deserializable");
+ }
+ poll();
+
+ unsubscribe();
+ }
+
+ protected void poll() {
+ Deserializable deserializable = (Deserializable) adapter;
+
+ ConsumerRecords<byte[], byte[]> records;
do {
records = consume();
@@ -201,19 +220,16 @@ public class SingleNodeKafkaResumeStrategy<K, V>
implements KafkaResumeStrategy<
break;
}
- for (ConsumerRecord<K, V> record : records) {
- V value = record.value();
+ for (ConsumerRecord<byte[], byte[]> record : records) {
+ byte[] value = record.value();
LOG.trace("Read from Kafka: {}", value);
- resumeCache.add(record.key(), record.value());
- if (resumeCache.isFull()) {
+ if (!deserializable.deserialize(ByteBuffer.wrap(record.key()),
ByteBuffer.wrap(record.value()))) {
break;
}
}
} while (true);
-
- unsubscribe();
}
/**
@@ -247,7 +263,7 @@ public class SingleNodeKafkaResumeStrategy<K, V> implements
KafkaResumeStrategy<
* Creates a new consumer rebalance listener. This can be useful for
setting the exact Kafka offset when necessary
* to read a limited amount of messages or customize the resume strategy
behavior when a rebalance occurs.
*
- * @param remaining
+ * @param remaining the number of remaining messages on the topic to try
to collect
* @return
*/
protected ConsumerRebalanceListener getConsumerRebalanceListener(long
remaining) {
@@ -292,7 +308,7 @@ public class SingleNodeKafkaResumeStrategy<K, V> implements
KafkaResumeStrategy<
*
* @return An instance of the consumer records
*/
- protected ConsumerRecords<K, V> consume() {
+ protected ConsumerRecords<byte[], byte[]> consume() {
int retries = 10;
return consume(retries);
@@ -304,9 +320,28 @@ public class SingleNodeKafkaResumeStrategy<K, V>
implements KafkaResumeStrategy<
* @param retries how many times to retry consuming data from the topic
* @return An instance of the consumer records
*/
- protected ConsumerRecords<K, V> consume(int retries) {
+ protected ConsumerRecords<byte[], byte[]> consume(int retries) {
while (retries > 0) {
- ConsumerRecords<K, V> records = consumer.poll(pollDuration);
+ ConsumerRecords<byte[], byte[]> records =
consumer.poll(pollDuration);
+ if (!records.isEmpty()) {
+ return records;
+ }
+ retries--;
+ }
+
+ return ConsumerRecords.empty();
+ }
+
+ /**
+ * Consumes message from the topic previously setup
+ *
+ * @param retries how many times to retry consuming data from the topic
+ * @param consumer the kafka consumer object instance to use
+ * @return An instance of the consumer records
+ */
+ protected ConsumerRecords<byte[], byte[]> consume(int retries,
Consumer<byte[], byte[]> consumer) {
+ while (retries > 0) {
+ ConsumerRecords<byte[], byte[]> records =
consumer.poll(pollDuration);
if (!records.isEmpty()) {
return records;
}
@@ -317,8 +352,14 @@ public class SingleNodeKafkaResumeStrategy<K, V>
implements KafkaResumeStrategy<
}
public void subscribe() throws Exception {
- if (resumeCache.capacity() >= 1) {
- checkAndSubscribe(topic, resumeCache.capacity());
+ if (adapter instanceof Cacheable) {
+ ResumeCache<?> cache = ((Cacheable) adapter).getCache();
+
+ if (cache.capacity() >= 1) {
+ checkAndSubscribe(topic, cache.capacity());
+ } else {
+ checkAndSubscribe(topic);
+ }
} else {
checkAndSubscribe(topic);
}
@@ -326,13 +367,18 @@ public class SingleNodeKafkaResumeStrategy<K, V>
implements KafkaResumeStrategy<
@Override
public ResumeAdapter getAdapter() {
- return resumeAdapter;
+ return adapter;
+ }
+
+ @Override
+ public void setAdapter(ResumeAdapter adapter) {
+ this.adapter = adapter;
}
/**
* Gets the set record of sent items
*
- * @return
+ * @return A collection with all the record errors
*/
protected Collection<RecordError> getProducerErrors() {
return Collections.unmodifiableCollection(producerErrors);
@@ -373,12 +419,6 @@ public class SingleNodeKafkaResumeStrategy<K, V>
implements KafkaResumeStrategy<
@Override
public void start() {
LOG.info("Starting the kafka resume strategy");
-
- try {
- loadCache();
- } catch (Exception e) {
- LOG.error("Failed to load already processed items: {}",
e.getMessage(), e);
- }
}
public Duration getPollDuration() {
@@ -389,11 +429,11 @@ public class SingleNodeKafkaResumeStrategy<K, V>
implements KafkaResumeStrategy<
this.pollDuration = Objects.requireNonNull(pollDuration, "The poll
duration cannot be null");
}
- protected Consumer<K, V> getConsumer() {
+ protected Consumer<byte[], byte[]> getConsumer() {
return consumer;
}
- protected Producer<K, V> getProducer() {
+ protected Producer<byte[], byte[]> getProducer() {
return producer;
}
@@ -409,14 +449,11 @@ public class SingleNodeKafkaResumeStrategy<K, V>
implements KafkaResumeStrategy<
return topic;
}
- protected ResumeCache<K, V> getResumeCache() {
- return resumeCache;
- }
-
/**
* Clear the producer errors
*/
public void resetProducerErrors() {
producerErrors.clear();
}
+
}
diff --git
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java
index 57669df0920..7397109fb3a 100644
---
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java
+++
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java
@@ -31,6 +31,7 @@ import
org.apache.camel.component.kafka.consumer.support.KafkaResumable;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.processor.resume.TransientResumeStrategy;
import org.apache.camel.resume.Offset;
+import org.apache.camel.resume.OffsetKey;
import org.apache.camel.resume.Resumable;
import org.apache.camel.resume.ResumeAdapter;
import org.apache.camel.resume.UpdatableConsumerResumeStrategy;
@@ -60,10 +61,9 @@ public class KafkaConsumerWithResumeRouteStrategyIT extends
BaseEmbeddedKafkaTes
@BindToRegistry("resumeStrategy")
private TestUpdateStrategy resumeStrategy;
private CountDownLatch messagesLatch;
- private KafkaProducer<Object, Object> producer;
private static class TestUpdateStrategy extends TransientResumeStrategy
- implements UpdatableConsumerResumeStrategy<String, Integer,
Resumable<String, Integer>> {
+ implements UpdatableConsumerResumeStrategy<Resumable> {
private final CountDownLatch messagesLatch;
private boolean startCalled;
private boolean offsetNull = true;
@@ -91,25 +91,25 @@ public class KafkaConsumerWithResumeRouteStrategyIT extends
BaseEmbeddedKafkaTes
}
@Override
- public void updateLastOffset(Resumable<String, Integer> offset) throws
Exception {
+ public void updateLastOffset(Resumable offset) {
try {
if (offset != null) {
offsetNull = false;
- String addressable = offset.getAddressable();
+ OffsetKey<?> addressable = offset.getOffsetKey();
if (addressable != null) {
offsetAddressableNull = false;
- offsetAddressableEmpty = addressable.isEmpty() ||
addressable.isBlank();
+ offsetAddressableEmpty = addressable.getKey() == null;
}
- Offset<Integer> offsetValue = offset.getLastOffset();
+ Offset<?> offsetValue = offset.getLastOffset();
if (offsetValue != null) {
offsetValueNull = false;
if (offsetValue.offset() != null) {
offsetValueEmpty = false;
- lastOffset = offsetValue.offset();
+ lastOffset = (int) offsetValue.offset();
}
}
}
@@ -176,9 +176,9 @@ public class KafkaConsumerWithResumeRouteStrategyIT extends
BaseEmbeddedKafkaTes
@BeforeEach
public void before() {
Properties props = getDefaultProperties();
+ KafkaProducer<Object, Object> producer = new
org.apache.kafka.clients.producer.KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
- producer = new
org.apache.kafka.clients.producer.KafkaProducer<>(props);
producer.send(new ProducerRecord<>(TOPIC, String.valueOf(i)));
}
}
diff --git
a/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java
b/core/camel-api/src/main/java/org/apache/camel/resume/Cacheable.java
similarity index 54%
copy from
core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java
copy to core/camel-api/src/main/java/org/apache/camel/resume/Cacheable.java
index 15e38fa0621..1b2e9cc5a36 100644
--- a/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java
+++ b/core/camel-api/src/main/java/org/apache/camel/resume/Cacheable.java
@@ -17,27 +17,31 @@
package org.apache.camel.resume;
-import org.apache.camel.Service;
+import org.apache.camel.resume.cache.ResumeCache;
/**
- * Defines a strategy for handling resume operations. Implementations can
define different ways to handle how to resume
- * processing records.
+ * Used to identify objects that can cache their resume state or data
*/
-public interface ResumeStrategy extends Service {
+public interface Cacheable {
/**
- * Gets an adapter for resuming operations
+ * Adds an offset key and value to the cache
+ * @param key the key to add
+ * @param offset the offset to add
+ * @return true if added successfully (i.e.: the cache is not full) or
false otherwise
*/
- ResumeAdapter getAdapter();
+ boolean add(OffsetKey<?> key, Offset<?> offset);
/**
- * Gets and adapter for resuming operations
- *
- * @param clazz the class of the adapter
- * @return the adapter or null if it can't be cast to the requested
class
- * @param <T> the type of the adapter
+ * Sets the cache in resume adapters and objects that cache their data
+ * @param cache A resume cache instance
*/
- default <T extends ResumeAdapter> T getAdapter(Class<T> clazz) {
- return clazz.cast(getAdapter());
- }
+ void setCache(ResumeCache<?> cache);
+
+
+ /**
+ * Gets the cache in resume adapters and objects that cache their data
+ * @return A resume cache instance
+ */
+ ResumeCache<?> getCache();
}
diff --git
a/core/camel-api/src/main/java/org/apache/camel/resume/Deserializable.java
b/core/camel-api/src/main/java/org/apache/camel/resume/Deserializable.java
new file mode 100644
index 00000000000..630dc7bc2d6
--- /dev/null
+++ b/core/camel-api/src/main/java/org/apache/camel/resume/Deserializable.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.resume;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+
+public interface Deserializable {
+
+ default Object deserializeObject(ByteBuffer buffer) {
+ buffer.clear();
+
+ int dataType = buffer.getInt();
+ switch (dataType) {
+ case Serializable.TYPE_INTEGER: {
+ return buffer.getInt();
+ }
+ case Serializable.TYPE_LONG: {
+ return buffer.getLong();
+ }
+ case Serializable.TYPE_STRING: {
+ byte[] tmp = new byte[1024];
+ buffer.get(tmp, 0, 1024);
+
+ return new String(tmp);
+ }
+ case Serializable.TYPE_FILE: {
+ int remaining = buffer.remaining();
+ byte[] tmp = new byte[remaining];
+ buffer.get(tmp);
+
+ return new File(new String(tmp));
+ }
+ default: {
+ return null;
+ }
+ }
+ }
+
+ boolean deserialize(ByteBuffer keyBuffer, ByteBuffer valueBuffer);
+}
diff --git a/core/camel-api/src/main/java/org/apache/camel/resume/Offset.java
b/core/camel-api/src/main/java/org/apache/camel/resume/Offset.java
index c41f312d49a..a6e5519f296 100644
--- a/core/camel-api/src/main/java/org/apache/camel/resume/Offset.java
+++ b/core/camel-api/src/main/java/org/apache/camel/resume/Offset.java
@@ -17,12 +17,20 @@
package org.apache.camel.resume;
+import java.nio.ByteBuffer;
+
/**
* Generic offset without a concrete type
*
* @param <T> the type of the offset
*/
-public interface Offset<T> {
+public interface Offset<T> extends Serializable {
+
+ /**
+ * Sets the current offset value
+ * @param offset the current offset value
+ */
+ void update(T offset);
/**
* Gets the offset value
@@ -31,4 +39,8 @@ public interface Offset<T> {
*/
T offset();
+ @Override
+ default ByteBuffer serialize() {
+ return serialize(offset());
+ }
}
diff --git
a/core/camel-api/src/main/java/org/apache/camel/resume/cache/SingleEntryCache.java
b/core/camel-api/src/main/java/org/apache/camel/resume/OffsetKey.java
similarity index 63%
rename from
core/camel-api/src/main/java/org/apache/camel/resume/cache/SingleEntryCache.java
rename to core/camel-api/src/main/java/org/apache/camel/resume/OffsetKey.java
index 2928dd8a26f..4299dbbdeef 100644
---
a/core/camel-api/src/main/java/org/apache/camel/resume/cache/SingleEntryCache.java
+++ b/core/camel-api/src/main/java/org/apache/camel/resume/OffsetKey.java
@@ -15,24 +15,29 @@
* limitations under the License.
*/
-package org.apache.camel.resume.cache;
+package org.apache.camel.resume;
-import java.util.Optional;
+import java.nio.ByteBuffer;
/**
- * A resume cache where a single key can only be mapped to a single entry
- *
- * @param <K> the type the key
- * @param <V> the type of the entry
+ * An interface to represent offset keys (addressable for an offset)
+ * @param <K> the type of the offset key
*/
-public interface SingleEntryCache<K, V> extends ResumeCache<K, V> {
+public interface OffsetKey<K> extends Serializable {
+ /**
+ * Sets the key
+ * @param key the key valeu
+ */
+ void setKey(K key);
/**
- * Gets the offset value for the key
- *
- * @param key the key
- * @return the key
+ * Gets the key
+ * @return the key instance
*/
- Optional<V> get(K key);
+ K getKey();
+ @Override
+ default ByteBuffer serialize() {
+ return serialize(getKey());
+ }
}
diff --git
a/core/camel-api/src/main/java/org/apache/camel/resume/Resumable.java
b/core/camel-api/src/main/java/org/apache/camel/resume/Resumable.java
index 108af04293a..586616c960d 100644
--- a/core/camel-api/src/main/java/org/apache/camel/resume/Resumable.java
+++ b/core/camel-api/src/main/java/org/apache/camel/resume/Resumable.java
@@ -22,30 +22,21 @@ package org.apache.camel.resume;
* For example, when reading large files, it may be possible to inform the
last offset that was read, thus allowing
* users of this interface to skip to that offset. This can potentially
improve resumable operations by allowing
* reprocessing of data.
- *
- * @param <Y> the type of the key, name or object that can be addressed by the
given offset
- * @param <T> the type of the addressable value for the resumable object (for
example, a file would use a Long value)
*/
-public interface Resumable<Y, T> {
+public interface Resumable {
/**
- * Updates the last offset as appropriate for the user of the interface
+ * Gets the offset key (i.e.: the addressable part of the resumable object)
*
- * @param offset the offset value
+ * @return An OffsetKey instance with the addressable part of the object.
May return null or an EmptyOffset
+ * depending on the type of the resumable
*/
- void updateLastOffset(T offset);
+ OffsetKey<?> getOffsetKey();
/**
* Gets the last offset value
*
* @return the last offset value according to the interface and type
implemented
*/
- Offset<T> getLastOffset();
-
- /**
- * Gets the addressable part (key) of the resumable
- *
- * @return the addressable part of the resumable
- */
- Y getAddressable();
+ Offset<?> getLastOffset();
}
diff --git
a/core/camel-api/src/main/java/org/apache/camel/resume/ResumableSet.java
b/core/camel-api/src/main/java/org/apache/camel/resume/ResumableSet.java
deleted file mode 100644
index d09a61f7e72..00000000000
--- a/core/camel-api/src/main/java/org/apache/camel/resume/ResumableSet.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.resume;
-
-import java.lang.reflect.Array;
-import java.util.Arrays;
-import java.util.function.Predicate;
-
-/**
- * An interface that represents a set of resumables (i.e.: files in a
directory, rows in a database, etc)
- *
- * @param <T> the indivudal type of each member of the set
- */
-public interface ResumableSet<T> {
-
- /**
- * Iterates over the set of input checking if they should be resumed or
not.
- *
- * @param input the input array to check for resumables
- * @param resumableCheck a checker method that returns true if a single
entry of the input should be resumed or
- * false otherwise. For instance: given a set A, B
and C, where B has already been processed,
- * then a test for A and C returns true, whereas a
test for B returns false.
- * @return a new array containing the elements that still
need to be processed
- */
- default T[] resumeEach(T[] input, Predicate<T> resumableCheck) {
- @SuppressWarnings("unchecked")
- T[] tmp = (T[]) Array.newInstance(input.getClass().getComponentType(),
input.length);
- int count = 0;
-
- for (T entry : input) {
- if (resumableCheck.test(entry)) {
- tmp[count] = entry;
- count++;
- }
- }
-
- if (count != input.length) {
- return Arrays.copyOf(tmp, count);
- }
-
- return input;
- }
-
- /**
- * Iterates over the set of input checking if they should be resumed or not
- *
- * @param resumableCheck a checker method that returns true if a single
entry of the input should be resumed or
- * false otherwise. For instance: given a set A, B
and C, where B has already been processed,
- * then a test for A and C returns true, whereas a
test for B returns false.
- */
- void resumeEach(Predicate<T> resumableCheck);
-
- /**
- * Gets the input that should be resumed
- *
- * @return an array with the input that should be resumed
- */
- T[] resumed();
-
- /**
- * Whether there are resumable entries to process
- *
- * @return true if there are resumable entries or false otherwise
- */
- boolean hasResumables();
-}
diff --git
a/core/camel-api/src/main/java/org/apache/camel/resume/ResumeAdapter.java
b/core/camel-api/src/main/java/org/apache/camel/resume/ResumeAdapter.java
index 1966c815c58..ae650304d3c 100644
--- a/core/camel-api/src/main/java/org/apache/camel/resume/ResumeAdapter.java
+++ b/core/camel-api/src/main/java/org/apache/camel/resume/ResumeAdapter.java
@@ -21,6 +21,9 @@ package org.apache.camel.resume;
* A resume adapter provides the component-specific logic that plugs the more
generic strategic with the lower level
* requirements of the component being used.
*
+ * The adapter class responsibility is to bind the component-specific part of
the logic to the more generic handling of
+ * the resume strategy. The adapter is always component specific and some
components may have more than one.
+ *
* It is the responsibility of the supported components to implement the
custom implementation for this part of the
* resume API, as well as to offer component-specific interfaces that can be
specialized by other integrations.
*/
diff --git
a/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java
b/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java
index 15e38fa0621..a1ffc6c67cc 100644
--- a/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java
+++ b/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java
@@ -25,6 +25,12 @@ import org.apache.camel.Service;
*/
public interface ResumeStrategy extends Service {
+ /**
+ * Sets an adapter for resuming operations with this strategy
+ * @param adapter the component-specific resume adapter
+ */
+ void setAdapter(ResumeAdapter adapter);
+
/**
* Gets an adapter for resuming operations
*/
@@ -40,4 +46,13 @@ public interface ResumeStrategy extends Service {
default <T extends ResumeAdapter> T getAdapter(Class<T> clazz) {
return clazz.cast(getAdapter());
}
+
+
+ /**
+ * Loads the cache with the data currently available in this strategy
+ * @throws Exception
+ */
+ default void loadCache() throws Exception {
+
+ }
}
diff --git
a/core/camel-api/src/main/java/org/apache/camel/resume/Serializable.java
b/core/camel-api/src/main/java/org/apache/camel/resume/Serializable.java
new file mode 100644
index 00000000000..1cf134eee55
--- /dev/null
+++ b/core/camel-api/src/main/java/org/apache/camel/resume/Serializable.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.resume;
+
+
+import java.io.File;
+import java.nio.ByteBuffer;
+
+import org.apache.camel.util.ObjectHelper;
+
+/**
+ * An interface that represents resumable objects that can be serialized to a
medium
+ */
+@FunctionalInterface
+public interface Serializable {
+ int TYPE_INTEGER = 0;
+ int TYPE_LONG = 1;
+ int TYPE_STRING = 2;
+ int TYPE_FILE = 3;
+
+ int BYTES = 1024;
+
+ /**
+ * Serializes this offset into a buffer of bytes
+ * @param obj the object to serialize
+ * @return a ByteBuffer instance with the serialized contents of this
object
+ */
+ default ByteBuffer serialize(Object obj) {
+ ObjectHelper.notNull(obj, "Cannot perform serialization on a null
object");
+
+ if (obj instanceof Long) {
+ ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES +
Long.BYTES);
+ buffer.putInt(TYPE_LONG);
+
+ long data = ((Long) obj).longValue();
+ buffer.putLong(data);
+ return buffer;
+ }
+ if (obj instanceof String) {
+ byte[] data = ((String) obj).getBytes();
+
+ ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES +
data.length);
+ buffer.putInt(TYPE_STRING);
+ buffer.put(data);
+
+ return buffer;
+ }
+ if (obj instanceof File) {
+
+ byte[] data = ((File) obj).getPath().getBytes();
+ ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES +
data.length);
+
+ buffer.putInt(TYPE_FILE);
+ buffer.put(data);
+
+ return buffer;
+ }
+
+ return null;
+ }
+
+ /**
+ * Serializes this offset into a buffer of bytes
+ * @return a ByteBuffer instance with the serialized contents of this
object
+ */
+ ByteBuffer serialize();
+}
diff --git
a/core/camel-api/src/main/java/org/apache/camel/resume/UpdatableConsumerResumeStrategy.java
b/core/camel-api/src/main/java/org/apache/camel/resume/UpdatableConsumerResumeStrategy.java
index af6e682f198..52204789bc0 100644
---
a/core/camel-api/src/main/java/org/apache/camel/resume/UpdatableConsumerResumeStrategy.java
+++
b/core/camel-api/src/main/java/org/apache/camel/resume/UpdatableConsumerResumeStrategy.java
@@ -20,11 +20,9 @@ package org.apache.camel.resume;
/**
* An updatable resume strategy
*
- * @param <K> the type of the key, name or object that can be addressed by the
given offset
* @param <T> the type of the addressable value for the resumable object (for
example, a file would use a Long value)
- * @param <T> a resumable type capable of handling/storing/holding resumable
information for the addressable and offset
*/
-public interface UpdatableConsumerResumeStrategy<K, V, T extends Resumable<K,
V>> {
+public interface UpdatableConsumerResumeStrategy<T extends Resumable> {
/**
* Updates the last processed offset
diff --git
a/core/camel-api/src/main/java/org/apache/camel/resume/cache/MultiEntryCache.java
b/core/camel-api/src/main/java/org/apache/camel/resume/cache/MultiEntryCache.java
deleted file mode 100644
index 0708d1ddff0..00000000000
---
a/core/camel-api/src/main/java/org/apache/camel/resume/cache/MultiEntryCache.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.resume.cache;
-
-/**
- * A cache where an entry can point to one or more entries. For instance, a
path as the key and the file entries as its
- * entries
- *
- * @param <K> the type of the key
- * @param <V> the type of the value
- */
-public interface MultiEntryCache<K, V> extends ResumeCache<K, V> {
-
-}
diff --git
a/core/camel-api/src/main/java/org/apache/camel/resume/cache/ResumeCache.java
b/core/camel-api/src/main/java/org/apache/camel/resume/cache/ResumeCache.java
index c5fbedd1fa5..a6eb7737344 100644
---
a/core/camel-api/src/main/java/org/apache/camel/resume/cache/ResumeCache.java
+++
b/core/camel-api/src/main/java/org/apache/camel/resume/cache/ResumeCache.java
@@ -17,15 +17,34 @@
package org.apache.camel.resume.cache;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
import org.apache.camel.resume.ResumeStrategy;
/**
* This cache stored the resumed data from a {@link ResumeStrategy}.
*
* @param <K> the type of the key
- * @param <V> the type of the offset value
*/
-public interface ResumeCache<K, V> {
+public interface ResumeCache<K> {
+
+ /**
+ * If the specified key is not present, compute its value from the mapping
function (like Java's standard Map one)
+ * @param key the key to get or associate with the value
+ * @param mapping the mapping function used to compute the value
+ * @return the value associated with the key (either the present or the
one computed from the mapping function)
+ */
+ Object computeIfAbsent(K key, Function<? super K, ? super Object> mapping);
+
+ /**
+ * If the specified key is present, compute a new value from the mapping
function (like Java's standard Map one)
+ * @param key the key to get or associate with the value
+ * @param remapping the remapping function used to compute the new value
+ * @return the value associated with the key (either the present or the
one computed from the mapping function)
+ */
+ Object computeIfPresent(K key, BiFunction<? super K, ? super Object, ?
super Object> remapping);
+
/**
* Whether the cache contains the key with the given entry value
@@ -34,7 +53,7 @@ public interface ResumeCache<K, V> {
* @param entry the entry
* @return true if the key/entry pair is stored in the cache
*/
- boolean contains(K key, V entry);
+ boolean contains(K key, Object entry);
/**
* Adds a value to the cache
@@ -42,7 +61,7 @@ public interface ResumeCache<K, V> {
* @param key the key to add
* @param offsetValue the offset value
*/
- void add(K key, V offsetValue);
+ void add(K key, Object offsetValue);
/**
* Checks whether the cache is full
@@ -55,4 +74,23 @@ public interface ResumeCache<K, V> {
* Gets the cache pool size
*/
long capacity();
+
+
+ /**
+ * Gets the offset entry for the key
+ *
+ * @param key the key
+ * @param clazz the class object representing the value to be obtained
+ * @return the offset value wrapped in an optional
+ */
+ <T> T get(K key, Class<T> clazz);
+
+
+ /**
+ * Gets the offset entry for the key
+ *
+ * @param key the key
+ * @return the offset value
+ */
+ Object get(K key);
}
diff --git
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRoute.java
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRoute.java
index 3a9d4671088..f065ffcddc6 100644
---
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRoute.java
+++
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRoute.java
@@ -42,6 +42,7 @@ import org.apache.camel.Suspendable;
import org.apache.camel.SuspendableService;
import org.apache.camel.resume.ConsumerListener;
import org.apache.camel.resume.ConsumerListenerAware;
+import org.apache.camel.resume.ResumeAdapter;
import org.apache.camel.resume.ResumeAware;
import org.apache.camel.resume.ResumeStrategy;
import org.apache.camel.spi.IdAware;
@@ -54,6 +55,7 @@ import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.spi.RoutePolicy;
import org.apache.camel.support.LoggerHelper;
import org.apache.camel.support.PatternHelper;
+import org.apache.camel.support.resume.AdapterHelper;
import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.support.service.ServiceSupport;
import org.apache.camel.util.ObjectHelper;
@@ -641,7 +643,9 @@ public class DefaultRoute extends ServiceSupport implements
Route {
((RouteIdAware) consumer).setRouteId(this.getId());
}
- if (consumer instanceof ResumeAware) {
+ if (consumer instanceof ResumeAware && resumeStrategy != null) {
+ ResumeAdapter resumeAdapter =
AdapterHelper.eval(getCamelContext(),consumer);
+ resumeStrategy.setAdapter(resumeAdapter);
((ResumeAware) consumer).setResumeStrategy(resumeStrategy);
}
diff --git
a/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc
b/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc
index bcaee49fecf..fb3a637cca5 100644
---
a/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc
+++
b/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc
@@ -34,7 +34,8 @@ This instance can be bound in the Context registry as follows:
[source,java]
----
-getCamelContext().getRegistry().bind("testResumeStrategy", new
MyTestResumeStrategy(new MyAdapter()));
+getCamelContext().getRegistry().bind("testResumeStrategy", new
MyTestResumeStrategy());
+getCamelContext().getRegistry().bind("resumeCache", new
MyChoiceOfResumeCache<>(100));
from("some:component")
.resumable("testResumeStrategy")
@@ -45,53 +46,20 @@ Or the instance can be constructed as follows:
[source,java]
----
+getCamelContext().getRegistry().bind("resumeCache", new
MyChoiceOfResumeCache<>(100));
+
from("some:component")
- .resumable(new MyTestResumeStrategy(new MyAdapter()))
+ .resumable(new MyTestResumeStrategy())
.process(this::process)
----
-=== The Resume Adapter
-
-The adapter class responsibility is to bind the component-specific part of the
logic to the more generic handling of the
-resume strategy. The adapter is always component specific and some components
may have more than one. Integrations with
-more complex resume processes, may implement their own adapters, although the
builtin ones should be useful in most of the
-cases. Currently, the following adapters are available:
-
-* camel-atom: `org.apache.camel.component.feed.EntryFilter`
-* camel-aws2-kinesis:
`org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeAdapter`
-* camel-cassandracql:
`org.apache.camel.component.cassandra.consumer.support.CassandraResumeAdapter`
-* camel-couchbase:
`org.apache.camel.component.couchbase.CouchbaseResumeAdapter`
-* camel-couchdb:
`org.apache.camel.component.couchdb.consumer.CouchDbResumeAdapter`
-* camel-file:
`org.apache.camel.component.file.consumer.adapters.FileSetResumeAdapter` for
directories
-* camel-file:
`org.apache.camel.component.file.consumer.adapters.GenericFileResumeAdapter`
for files
-* camel-kafka:
`org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeAdapter`
-* camel-rss: `org.apache.camel.component.feed.EntryFilter`
-* generic: `org.apache.camel.processor.resume.DelegatingResumeAdapter`
-
-Note: in the future, these adapters will be resolved automatically by Camel.
-
-== The Resume API Interfaces
-
-These are the *core interfaces*:
-
-* `org.apache.camel.resume.ResumeStrategy` - the resume strategy service
-* `org.apache.camel.resume.ResumeAdapter` - an adapter that binds the generic
parts of the resume strategy with the component
-* `org.apache.camel.resume.UpdatableConsumerResumeStrategy` - an extension to
the resume strategy to allow updatable strategies
-* `org.apache.camel.resume.cache.ResumeCache` - the base interface for local
cache for resumable information
-* `org.apache.camel.resume.cache.SingleEntryCache` - an interface for local
cache for resumable information where there is a one-to-one relationship
between cache a key and its entry (i.e: a file and its offset)
-* `org.apache.camel.resume.cache.MultiEntryCache` - an interface for local
cache for resumable information where there is a one-to-many relationship
between the cache a keys and its entries (i.e.: a path and its file entries)
-
-These are the *core classes* supporting the strategies:
-
-* `org.apache.camel.resume.Resumable` - an interface to allow users to work
with abstract resumable entities (files, offsets, etc)
-* `org.apache.camel.resume.ResumableSet` - an interface for resumables with a
1-to-many relationship
-* `org.apache.camel.resume.Offset` - a generic offset without a concrete type
(it may represent a long, a file name, etc)
-
-These are the *supporting classes*:
+In some circumstances, such as when dealing with File I/O, it may be necessary
to set the offset manually. There are
+*supporting classes* that can help work with resumables:
* `org.apache.camel.support.Resumables` - resumables handling support
* `org.apache.camel.support.Offsets` - offset handling support
+
== Builtin Resume Strategies
Camel comes with a few builtin strategies that can be used to store, retrieve
and update the offsets. The following strategies are available:
@@ -108,8 +76,7 @@ New builtin resume strategies can be created by implementing
the `UpdatableConsu
A sample local cache implemented using
https://github.com/ben-manes/caffeine[Caffeine].
-* `org.apache.camel.component.caffeine.resume.single.CaffeineCache`: for data
with where 1 key can only point to 1 entry (1-to-1 relationship)
-* `org.apache.camel.component.caffeine.resume.multi.CaffeineCache`: for data
with where 1 key can point to 1 or more entries (1-to-many relationship)
+* `org.apache.camel.component.caffeine.resume.CaffeineCache`
== Known Limitations
diff --git
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/DelegatingResumeAdapter.java
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/DelegatingResumeAdapter.java
deleted file mode 100644
index 6aee7fd8f4f..00000000000
---
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/DelegatingResumeAdapter.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.processor.resume;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.Spliterator;
-import java.util.function.Consumer;
-import java.util.function.IntFunction;
-import java.util.function.Predicate;
-import java.util.function.UnaryOperator;
-import java.util.stream.Stream;
-
-import org.apache.camel.resume.ResumeAdapter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A delegating adapter that can be used to delegate to and/or abstract resume
adapters
- */
-public class DelegatingResumeAdapter implements ResumeAdapter {
- private static final Logger LOG =
LoggerFactory.getLogger(DelegatingResumeAdapter.class);
-
- private final List<ResumeAdapter> resumeStrategies;
-
- public DelegatingResumeAdapter() {
- resumeStrategies = new ArrayList<>();
- }
-
- protected DelegatingResumeAdapter(List<ResumeAdapter> resumeStrategies) {
- this.resumeStrategies = resumeStrategies;
- }
-
- public boolean add(ResumeAdapter resumeAdapter) {
- return resumeStrategies.add(resumeAdapter);
- }
-
- public boolean remove(Object resumeAdapter) {
- return resumeStrategies.remove(resumeAdapter);
- }
-
- public boolean removeIf(Predicate<? super ResumeAdapter> filter) {
- return resumeStrategies.removeIf(filter);
- }
-
- @Override
- public void resume() {
- resumeStrategies.forEach(ResumeAdapter::resume);
- }
-
- public int size() {
- return resumeStrategies.size();
- }
-
- public boolean isEmpty() {
- return resumeStrategies.isEmpty();
- }
-
- public boolean contains(Object o) {
- return resumeStrategies.contains(o);
- }
-
- public Iterator<ResumeAdapter> iterator() {
- return resumeStrategies.iterator();
- }
-
- public Object[] toArray() {
- return resumeStrategies.toArray();
- }
-
- public <T> T[] toArray(T[] a) {
- return resumeStrategies.toArray(a);
- }
-
- public boolean containsAll(Collection<?> c) {
- return resumeStrategies.containsAll(c);
- }
-
- public boolean addAll(Collection<? extends ResumeAdapter> c) {
- return resumeStrategies.addAll(c);
- }
-
- public boolean addAll(int index, Collection<? extends ResumeAdapter> c) {
- return resumeStrategies.addAll(index, c);
- }
-
- public boolean removeAll(Collection<?> c) {
- return resumeStrategies.removeAll(c);
- }
-
- public boolean retainAll(Collection<?> c) {
- return resumeStrategies.retainAll(c);
- }
-
- public void replaceAll(UnaryOperator<ResumeAdapter> operator) {
- resumeStrategies.replaceAll(operator);
- }
-
- public void sort(Comparator<? super ResumeAdapter> c) {
- resumeStrategies.sort(c);
- }
-
- public void clear() {
- resumeStrategies.clear();
- }
-
- public ResumeAdapter get(int index) {
- return resumeStrategies.get(index);
- }
-
- public ResumeAdapter set(int index, ResumeAdapter element) {
- return resumeStrategies.set(index, element);
- }
-
- public void add(int index, ResumeAdapter element) {
- resumeStrategies.add(index, element);
- }
-
- public ResumeAdapter remove(int index) {
- return resumeStrategies.remove(index);
- }
-
- public int indexOf(Object o) {
- return resumeStrategies.indexOf(o);
- }
-
- public int lastIndexOf(Object o) {
- return resumeStrategies.lastIndexOf(o);
- }
-
- public ListIterator<ResumeAdapter> listIterator() {
- return resumeStrategies.listIterator();
- }
-
- public ListIterator<ResumeAdapter> listIterator(int index) {
- return resumeStrategies.listIterator(index);
- }
-
- public List<ResumeAdapter> subList(int fromIndex, int toIndex) {
- return resumeStrategies.subList(fromIndex, toIndex);
- }
-
- public Spliterator<ResumeAdapter> spliterator() {
- return resumeStrategies.spliterator();
- }
-
- public <T> T[] toArray(IntFunction<T[]> generator) {
- return resumeStrategies.toArray(generator);
- }
-
- public Stream<ResumeAdapter> stream() {
- return resumeStrategies.stream();
- }
-
- public Stream<ResumeAdapter> parallelStream() {
- return resumeStrategies.parallelStream();
- }
-
- public void forEach(Consumer<? super ResumeAdapter> action) {
- resumeStrategies.forEach(action);
- }
-}
diff --git
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableCompletion.java
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableCompletion.java
index aa347da63e1..a2185ccfc65 100644
---
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableCompletion.java
+++
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableCompletion.java
@@ -48,10 +48,10 @@ public class ResumableCompletion implements Synchronization
{
Object offset =
ExchangeHelper.getResultMessage(exchange).getHeader(Exchange.OFFSET);
if (offset instanceof Resumable) {
- Resumable<?, ?> resumable = (Resumable<?, ?>) offset;
+ Resumable resumable = (Resumable) offset;
if (LOG.isTraceEnabled()) {
- LOG.trace("Processing the resumable: {}",
resumable.getAddressable());
+ LOG.trace("Processing the resumable: {}",
resumable.getOffsetKey());
LOG.trace("Processing the resumable of type: {}",
resumable.getLastOffset().offset());
}
@@ -75,14 +75,14 @@ public class ResumableCompletion implements Synchronization
{
@Override
public void onFailure(Exchange exchange) {
Exception e = exchange.getException();
- Object offset = exchange.getMessage().getHeader(Exchange.OFFSET);
+ Object resObj = exchange.getMessage().getHeader(Exchange.OFFSET);
- if (offset instanceof Resumable) {
- Resumable<?, ?> resumable = (Resumable<?, ?>) offset;
+ if (resObj instanceof Resumable) {
+ Resumable resumable = (Resumable) resObj;
String logMessage = String.format(
"Skipping offset update with address '%s' and offset value
'%s' due to failure in processing: %s",
- resumable.getAddressable(),
resumable.getLastOffset().offset(), e.getMessage());
+ resumable.getOffsetKey(),
resumable.getLastOffset().offset(), e.getMessage());
if (LOG.isDebugEnabled() || CamelLogger.shouldLog(LOG,
loggingLevel)) {
CamelLogger.log(LOG, LoggingLevel.DEBUG, logMessage, e);
@@ -93,7 +93,7 @@ public class ResumableCompletion implements Synchronization {
}
} else {
String logMessage = String.format("Skipping offset update of '%s'
due to failure in processing: %s",
- offset == null ? "type null" : "unspecified type",
e.getMessage());
+ resObj == null ? "type null" : "unspecified type",
e.getMessage());
if (LOG.isDebugEnabled() || CamelLogger.shouldLog(LOG,
loggingLevel)) {
CamelLogger.log(LOG, LoggingLevel.DEBUG, logMessage, e);
diff --git
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java
index b0ffd26c080..205612b17e9 100644
---
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java
+++
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java
@@ -31,11 +31,21 @@ public class TransientResumeStrategy implements
ResumeStrategy {
this.resumeAdapter = resumeAdapter;
}
+ @Override
+ public void setAdapter(ResumeAdapter adapter) {
+
+ }
+
@Override
public ResumeAdapter getAdapter() {
return resumeAdapter;
}
+ @Override
+ public <T extends ResumeAdapter> T getAdapter(Class<T> clazz) {
+ return ResumeStrategy.super.getAdapter(clazz);
+ }
+
@Override
public void start() {
// this is NO-OP
diff --git
a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java
b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java
index e3ac865298f..9ffe7489dc9 100644
---
a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java
@@ -20,15 +20,16 @@ import java.io.File;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.file.consumer.GenericFileResumable;
-import org.apache.camel.component.file.consumer.GenericFileResumeAdapter;
+import org.apache.camel.component.file.consumer.DirectoryEntriesResumeAdapter;
+import org.apache.camel.component.file.consumer.FileOffsetResumeAdapter;
+import org.apache.camel.component.file.consumer.FileResumeAdapter;
+import org.apache.camel.component.file.consumer.adapters.DirectoryEntries;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.processor.resume.TransientResumeStrategy;
import org.apache.camel.resume.Resumable;
@@ -37,37 +38,57 @@ import org.apache.camel.support.resume.Resumables;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class FileConsumerResumeFromOffsetStrategyTest extends
ContextTestSupport {
+ private static final Logger LOG =
LoggerFactory.getLogger(FileConsumerResumeFromOffsetStrategyTest.class);
+
+ private static class TestFileResumeAdapter implements FileResumeAdapter,
FileOffsetResumeAdapter {
+ private GenericFile<File> resumable;
+ private DirectoryEntries fileSet;
- private static class TestFileResumeAdapter implements
GenericFileResumeAdapter {
@Override
- public void resume(GenericFileResumable<File> resumable) {
- if
(!resumable.getAddressable().getName().startsWith("resume-from-offset")) {
+ public void setResumePayload(GenericFile<File> resumable) {
+ if
(!resumable.getFile().getName().startsWith("resume-from-offset")) {
throw new RuntimeCamelException("Invalid file - resume
strategy should not have been called!");
}
- resumable.updateLastOffset(3L);
+ this.resumable = resumable;
}
@Override
public void resume() {
- throw new UnsupportedOperationException("Unsupported operation");
- // NO-OP
- }
+ if (resumable != null) {
+ resumable.updateLastOffset(3L);
+ resumable = null;
+ }
- @Override
- public Optional<Long> getLastOffset(File addressable) {
- return Optional.empty();
+ if (fileSet != null) {
+ DirectoryEntries.doResume(fileSet, f ->
!f.getName().equals("resume-from-offset"));
+ LOG.debug("Fileset: {}", fileSet);
+ LOG.debug("Fileset: {}", fileSet.resumed());
+
+ fileSet = null;
+ }
}
}
- private static class FailResumeAdapter extends TestFileResumeAdapter
- implements UpdatableConsumerResumeStrategy<File, Long,
Resumable<File, Long>> {
+ private static class FailResumeAdapter implements FileResumeAdapter,
DirectoryEntriesResumeAdapter,UpdatableConsumerResumeStrategy<Resumable> {
private boolean called;
@Override
- public void updateLastOffset(Resumable<File, Long> offset) {
+ public void resume() {
+
+ }
+
+ @Override
+ public void setResumePayload(DirectoryEntries fileSet) {
+ DirectoryEntries.doResume(fileSet, f -> true);
+ }
+
+ @Override
+ public void updateLastOffset(Resumable offset) {
called = true;
}
}
diff --git
a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java
b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java
index aee27bb8fe7..5241e1571ec 100644
---
a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java
@@ -24,40 +24,46 @@ import java.util.Objects;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.file.consumer.FileResumeSet;
-import org.apache.camel.component.file.consumer.FileSetResumeAdapter;
+import org.apache.camel.component.file.consumer.DirectoryEntriesResumeAdapter;
+import org.apache.camel.component.file.consumer.FileResumeAdapter;
+import org.apache.camel.component.file.consumer.adapters.DirectoryEntries;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.processor.resume.TransientResumeStrategy;
import org.apache.camel.support.resume.Resumables;
+import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@DisplayName("Tests whether file consumer works with the resume strategy")
public class FileConsumerResumeStrategyTest extends ContextTestSupport {
- private static class TestFileSetResumeAdapter implements
FileSetResumeAdapter {
- private List<String> processedFiles = Arrays.asList("0.txt", "1.txt",
"2.txt");
- private FileResumeSet resumeSet;
+ private static class TestFileSetResumeAdapter implements
FileResumeAdapter, DirectoryEntriesResumeAdapter {
+ private final List<String> processedFiles = Arrays.asList("0.txt",
"1.txt", "2.txt");
+ private DirectoryEntries resumeSet;
@Override
- public void resume(FileResumeSet resumeSet) {
+ public void setResumePayload(DirectoryEntries resumeSet) {
this.resumeSet = Objects.requireNonNull(resumeSet);
-
- resume();
}
@Override
public void resume() {
- if (resumeSet != null) {
- resumeSet.resumeEach(f ->
!processedFiles.contains(f.getName()));
- }
+ DirectoryEntries.doResume(resumeSet, f ->
!processedFiles.contains(f.getName()));
}
}
+ private final TestFileSetResumeAdapter adapter = new
TestFileSetResumeAdapter();
+
private static Map<String, Object> headerFor(int num) {
String name = num + ".txt";
return Map.of(Exchange.FILE_NAME, name);
}
+ @DisplayName("Tests whether it can resume processing of directory entries")
@Test
public void testResume() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:result");
@@ -73,6 +79,10 @@ public class FileConsumerResumeStrategyTest extends
ContextTestSupport {
// only expect 4 of the 6 sent
assertMockEndpointsSatisfied();
+
+ assertTrue(adapter.resumeSet.hasResumables(), "The resume set should
have resumables in this scenario");
+ assertNotNull(adapter.resumeSet.resumed(), "The list of resumables
should not be null");
+ assertEquals(4, adapter.resumeSet.resumed().length, "There should be
exactly 4 resumables");
}
private void setOffset(Exchange exchange) {
@@ -85,12 +95,12 @@ public class FileConsumerResumeStrategyTest extends
ContextTestSupport {
}
@Override
- protected RouteBuilder createRouteBuilder() throws Exception {
+ protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
- bindToRegistry("testResumeStrategy", new
TransientResumeStrategy(new TestFileSetResumeAdapter()));
+ bindToRegistry("testResumeStrategy", new
TransientResumeStrategy(adapter));
from(fileUri("resume?noop=true&recursive=true"))
.resumable("testResumeStrategy")
diff --git
a/core/camel-support/src/main/java/org/apache/camel/support/resume/AdapterHelper.java
b/core/camel-support/src/main/java/org/apache/camel/support/resume/AdapterHelper.java
new file mode 100644
index 00000000000..86b066a4667
--- /dev/null
+++
b/core/camel-support/src/main/java/org/apache/camel/support/resume/AdapterHelper.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.support.resume;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Properties;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Consumer;
+import org.apache.camel.resume.Cacheable;
+import org.apache.camel.resume.ResumeAdapter;
+import org.apache.camel.resume.cache.ResumeCache;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class AdapterHelper {
+ private static final Logger LOG =
LoggerFactory.getLogger(AdapterHelper.class);
+ private static final String ADAPTER_PROPERTIES =
"/org/apache/camel/resume/adapter.properties";
+ private static final String PROP_ADAPTER_CLASS = "adapterClass";
+
+ private AdapterHelper() {
+ }
+
+ public static ResumeAdapter eval(CamelContext context, Consumer consumer) {
+ assert context != null;
+ assert consumer != null;
+
+ Object adapterInstance =
context.getRegistry().lookupByName("resumeAdapter");
+ if (adapterInstance == null) {
+ adapterInstance = resolveAdapter(context, consumer);
+
+ if (adapterInstance == null) {
+ throw new RuntimeException("Cannot find a resume adapter class
in the consumer classpath or in the registry");
+ }
+ }
+
+ if (adapterInstance instanceof ResumeAdapter) {
+ ResumeAdapter resumeAdapter = (ResumeAdapter) adapterInstance;
+
+ Object obj = context.getRegistry().lookupByName("resumeCache");
+ if (resumeAdapter instanceof Cacheable && obj instanceof
ResumeCache) {
+ ((Cacheable) resumeAdapter).setCache((ResumeCache<?>) obj);
+ } else {
+ LOG.debug("The resume adapter {} is not cacheable",
resumeAdapter.getClass().getName());
+ }
+
+ return resumeAdapter;
+ } else {
+ LOG.error("Invalid resume adapter type: {}",
getType(adapterInstance));
+ throw new IllegalArgumentException("Invalid resume adapter type: "
+ getType(adapterInstance));
+ }
+ }
+
+ private static Object resolveAdapter(CamelContext context, Consumer
consumer) {
+ try (InputStream adapterStream =
consumer.getClass().getResourceAsStream(ADAPTER_PROPERTIES)) {
+
+ if (adapterStream == null) {
+ LOG.error("Cannot find a resume adapter class in the consumer
{} classpath", consumer.getClass());
+ return null;
+ }
+
+ Properties properties = new Properties();
+ properties.load(adapterStream);
+
+ String adapterClass = properties.getProperty(PROP_ADAPTER_CLASS);
+
+ if (ObjectHelper.isEmpty(adapterClass)) {
+ LOG.error("A resume adapter class is not defined in the
adapter configuration");
+
+ return null;
+ }
+
+ LOG.debug("About to load an adapter class {} for consumer {}",
adapterClass, consumer.getClass());
+ Class<?> clazz =
context.getClassResolver().resolveClass(adapterClass);
+ if (clazz == null) {
+ LOG.error("Cannot find the resume adapter class in the
classpath {}", adapterClass);
+
+ return null;
+ }
+
+ return clazz.getDeclaredConstructor().newInstance();
+ } catch (IOException e) {
+ LOG.error("Unable to read the resolve the resume adapter due to
I/O error: {}", e.getMessage(), e);
+ } catch (InvocationTargetException | InstantiationException |
IllegalAccessException | NoSuchMethodException e) {
+ LOG.error("Unable to create a resume adapter instance: {}",
e.getMessage(), e);
+ }
+
+ return null;
+ }
+
+ private static Object getType(Object instance) {
+ return instance == null ? "null" : instance.getClass();
+ }
+}
diff --git
a/core/camel-support/src/main/java/org/apache/camel/support/resume/OffsetKeys.java
b/core/camel-support/src/main/java/org/apache/camel/support/resume/OffsetKeys.java
new file mode 100644
index 00000000000..4e3e640396d
--- /dev/null
+++
b/core/camel-support/src/main/java/org/apache/camel/support/resume/OffsetKeys.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.support.resume;
+
+import org.apache.camel.resume.OffsetKey;
+
+/**
+ * Utility class for handling offset keys
+ */
+public final class OffsetKeys {
+ /**
+ * For creating anonymous offset keys
+ *
+ * @param <T> the type of the offset key
+ */
+ private static class AnonymousOffsetKey<T> implements OffsetKey<T> {
+ private T key;
+
+ public AnonymousOffsetKey() {
+ }
+
+ public AnonymousOffsetKey(T key) {
+ this.key = key;
+ }
+
+ @Override
+ public void setKey(T key) {
+ this.key = key;
+ }
+
+ @Override
+ public T getKey() {
+ return key;
+ }
+ }
+
+ /**
+ * For creating unmodifiable offset keys
+ *
+ * @param <T> the type of the offset key
+ */
+ private static class UnmodifiableOffsetKey<T> implements OffsetKey<T> {
+ private final T key;
+
+ public UnmodifiableOffsetKey(T key) {
+ this.key = key;
+ }
+
+ @Override
+ public void setKey(T key) {
+ throw new UnsupportedOperationException("This object is
unmodifiable");
+ }
+
+ @Override
+ public T getKey() {
+ return key;
+ }
+ }
+
+ private OffsetKeys() {
+ }
+
+ /**
+ * Creates a new offset key wrapping the given object
+ *
+ * @param object the object to wrap in the offset key
+ * @return a new OffsetKey object that wraps the given object
+ * @param <T> the type of the object being wrapped
+ */
+ public static <T> OffsetKey<T> of(T object) {
+ return new AnonymousOffsetKey<>(object);
+ }
+
+ /**
+ * Creates a new unmodifiable offset key wrapping the given object
+ *
+ * @param object the object to wrap in the offset key
+ * @return a new OffsetKey object that wraps the given object. The
offset key of this object cannot be
+ * updated.
+ * @param <T> the type of the object being wrapped
+ */
+ public static <T> OffsetKey<T> unmodifiableOf(T object) {
+ return new UnmodifiableOffsetKey<>(object);
+ }
+
+ /**
+ * Creates new empty OffsetKey object
+ *
+ * @return an empty OffsetKey object
+ */
+ public static OffsetKey<?> empty() {
+ return new AnonymousOffsetKey<>();
+ }
+}
diff --git
a/core/camel-support/src/main/java/org/apache/camel/support/resume/Offsets.java
b/core/camel-support/src/main/java/org/apache/camel/support/resume/Offsets.java
index cc2b0195ac5..46403d4c66a 100644
---
a/core/camel-support/src/main/java/org/apache/camel/support/resume/Offsets.java
+++
b/core/camel-support/src/main/java/org/apache/camel/support/resume/Offsets.java
@@ -23,11 +23,23 @@ import org.apache.camel.resume.Offset;
* Offset handling support
*/
public final class Offsets {
+ private static class AnonymousOffset<T> implements Offset<T> {
+ private T offset;
- /**
- * Default initial offset when using long offsets
- */
- public static final Offset<Long> INITIAL_LONG = Offsets.of(0L);
+ public AnonymousOffset(T offset) {
+ this.offset = offset;
+ }
+
+ @Override
+ public void update(T offset) {
+ this.offset = offset;
+ }
+
+ @Override
+ public T offset() {
+ return offset;
+ }
+ }
private Offsets() {
}
@@ -40,7 +52,7 @@ public final class Offsets {
* @return A new Offset holder with the given offset value
*/
public static <T> Offset<T> of(T offsetValue) {
- return () -> offsetValue;
+ return new AnonymousOffset<>(offsetValue);
}
/**
@@ -54,7 +66,7 @@ public final class Offsets {
*/
public static <T> Offset<T> ofNullable(T offsetValue, T defaultValue) {
if (offsetValue != null) {
- return () -> offsetValue;
+ return new AnonymousOffset<>(offsetValue);
}
return Offsets.of(defaultValue);
diff --git
a/core/camel-support/src/main/java/org/apache/camel/support/resume/Resumables.java
b/core/camel-support/src/main/java/org/apache/camel/support/resume/Resumables.java
index 56698545bc6..7478735d739 100644
---
a/core/camel-support/src/main/java/org/apache/camel/support/resume/Resumables.java
+++
b/core/camel-support/src/main/java/org/apache/camel/support/resume/Resumables.java
@@ -17,7 +17,12 @@
package org.apache.camel.support.resume;
+import java.lang.reflect.Array;
+import java.util.Arrays;
+import java.util.function.Predicate;
+
import org.apache.camel.resume.Offset;
+import org.apache.camel.resume.OffsetKey;
import org.apache.camel.resume.Resumable;
/**
@@ -31,7 +36,7 @@ public final class Resumables {
* @param <K> the type of the key, name or object that can be addressed by
the given offset (aka addressable)
* @param <V> the type of offset
*/
- private static class AnonymousResumable<K, V> implements Resumable<K, V> {
+ private static class AnonymousResumable<K, V> implements Resumable {
private final K addressable;
private V offset;
@@ -55,19 +60,26 @@ public final class Resumables {
this.offset = offset;
}
- @Override
- public void updateLastOffset(V offset) {
- this.offset = offset;
- }
-
@Override
public Offset<V> getLastOffset() {
return Offsets.of(offset);
}
@Override
- public K getAddressable() {
- return addressable;
+ public OffsetKey<K> getOffsetKey() {
+ return new OffsetKey<>() {
+ private final K key = addressable;
+
+ @Override
+ public void setKey(K key) {
+ throw new UnsupportedOperationException("Setting offset
keys for anonymous resumables is unsupported");
+ }
+
+ @Override
+ public K getKey() {
+ return key;
+ }
+ };
}
}
@@ -85,7 +97,36 @@ public final class Resumables {
* @param <V> the type of offset
* @return A new resumable entity for the given addressable
with the given offset value
*/
- public static <K, V> Resumable<K, V> of(K addressable, V offset) {
+ public static <K, V> Resumable of(K addressable, V offset) {
return new AnonymousResumable<>(addressable, offset);
}
+
+ /**
+ * Iterates over the set of input checking if they should be resumed or
not.
+ *
+ * @param input the input array to check for resumables
+ * @param resumableCheck a checker method that returns true if a single
entry of the input should be resumed or
+ * false otherwise. For instance: given a set A, B
and C, where B has already been processed,
+ * then a test for A and C returns true, whereas a
test for B returns false.
+ * @return a new array containing the elements that still
need to be processed
+ */
+ public static <T> T[] resumeEach(T[] input, Predicate<T> resumableCheck) {
+ @SuppressWarnings("unchecked")
+ T[] tmp = (T[]) Array.newInstance(input.getClass().getComponentType(),
input.length);
+ int count = 0;
+
+ for (T entry : input) {
+ if (resumableCheck.test(entry)) {
+ tmp[count] = entry;
+ count++;
+ }
+ }
+
+ if (count != input.length) {
+ return Arrays.copyOf(tmp, count);
+ }
+
+ return input;
+ }
+
}