This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 9d581cae407 CAMEL-18127: added adapter auto-configuration for Cassandra
9d581cae407 is described below
commit 9d581cae407794f55cace2c7bb3a8734db036172
Author: Otavio Rodolfo Piske <[email protected]>
AuthorDate: Mon Jun 6 13:16:20 2022 +0200
CAMEL-18127: added adapter auto-configuration for Cassandra
---
.../org/apache/camel/catalog/components/cql.json | 3 +-
.../component/caffeine/resume/CaffeineCache.java | 13 ++++
.../org/apache/camel/component/cassandra/cql.json | 3 +-
.../component/cassandra/CassandraConstants.java | 3 +
.../component/cassandra/CassandraConsumer.java | 15 ++++-
...sumeAdapter.java => CassandraResumeAction.java} | 16 ++---
.../consumer/support/CassandraResumeAdapter.java | 9 ++-
.../support/DefaultCassandraResumeAdapter.java | 70 ++++++++++++++++++++++
.../org/apache/camel/resume/adapter.properties | 19 ++++++
.../CassandraComponentResumeStrategyIT.java | 30 +++-------
.../org/apache/camel/resume/cache/ResumeCache.java | 7 +++
11 files changed, 149 insertions(+), 39 deletions(-)
diff --git
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/cql.json
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/cql.json
index 0938f6873be..2a568f0f551 100644
---
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/cql.json
+++
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/cql.json
@@ -27,7 +27,8 @@
"autowiredEnabled": { "kind": "property", "displayName": "Autowired
Enabled", "group": "advanced", "label": "advanced", "required": false, "type":
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": true, "description": "Whether autowiring is
enabled. This is used for automatic autowiring options (the option must be
marked as autowired) by looking up in the registry to find if there is a single
instance of matching type, which t [...]
},
"headers": {
- "CamelCqlQuery": { "kind": "header", "displayName": "", "group":
"producer", "label": "producer", "required": false, "javaType": "String",
"deprecated": false, "deprecationNote": "", "autowired": false, "secret":
false, "description": "The CQL query to execute.", "constantName":
"org.apache.camel.component.cassandra.CassandraConstants#CQL_QUERY" }
+ "CamelCqlQuery": { "kind": "header", "displayName": "", "group":
"producer", "label": "producer", "required": false, "javaType": "String",
"deprecated": false, "deprecationNote": "", "autowired": false, "secret":
false, "description": "The CQL query to execute.", "constantName":
"org.apache.camel.component.cassandra.CassandraConstants#CQL_QUERY" },
+ "CamelCqlResumeQuery": { "kind": "header", "displayName": "", "group":
"consumer", "label": "consumer", "required": false, "javaType": "String",
"deprecated": false, "deprecationNote": "", "autowired": false, "secret":
false, "description": "The CQL query to execute when resuming.",
"constantName":
"org.apache.camel.component.cassandra.CassandraConstants#CASSANDRA_RESUME_ACTION"
}
},
"properties": {
"beanRef": { "kind": "path", "displayName": "Bean Ref", "group": "common",
"label": "", "required": false, "type": "string", "javaType":
"java.lang.String", "deprecated": false, "autowired": false, "secret": false,
"description": "beanRef is defined using bean:id" },
diff --git
a/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/resume/CaffeineCache.java
b/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/resume/CaffeineCache.java
index 5ec66ddc88b..30d1de9fd54 100644
---
a/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/resume/CaffeineCache.java
+++
b/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/resume/CaffeineCache.java
@@ -18,6 +18,7 @@
package org.apache.camel.component.caffeine.resume;
import java.util.Optional;
+import java.util.concurrent.ConcurrentMap;
import java.util.function.BiFunction;
import java.util.function.Function;
@@ -121,4 +122,16 @@ public class CaffeineCache<K> implements ResumeCache<K> {
public long capacity() {
return cacheSize;
}
+
+ @Override
+ public void forEach(BiFunction<? super K, ? super Object, Boolean> action)
{
+
+ final ConcurrentMap<K, Object> kObjectConcurrentMap = cache.asMap();
+ for (var entry : kObjectConcurrentMap.entrySet()) {
+ final boolean invalidate = action.apply(entry.getKey(),
entry.getValue());
+ if (invalidate) {
+ cache.invalidate(entry.getKey());
+ }
+ }
+ }
}
diff --git
a/components/camel-cassandraql/src/generated/resources/org/apache/camel/component/cassandra/cql.json
b/components/camel-cassandraql/src/generated/resources/org/apache/camel/component/cassandra/cql.json
index 0938f6873be..2a568f0f551 100644
---
a/components/camel-cassandraql/src/generated/resources/org/apache/camel/component/cassandra/cql.json
+++
b/components/camel-cassandraql/src/generated/resources/org/apache/camel/component/cassandra/cql.json
@@ -27,7 +27,8 @@
"autowiredEnabled": { "kind": "property", "displayName": "Autowired
Enabled", "group": "advanced", "label": "advanced", "required": false, "type":
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": true, "description": "Whether autowiring is
enabled. This is used for automatic autowiring options (the option must be
marked as autowired) by looking up in the registry to find if there is a single
instance of matching type, which t [...]
},
"headers": {
- "CamelCqlQuery": { "kind": "header", "displayName": "", "group":
"producer", "label": "producer", "required": false, "javaType": "String",
"deprecated": false, "deprecationNote": "", "autowired": false, "secret":
false, "description": "The CQL query to execute.", "constantName":
"org.apache.camel.component.cassandra.CassandraConstants#CQL_QUERY" }
+ "CamelCqlQuery": { "kind": "header", "displayName": "", "group":
"producer", "label": "producer", "required": false, "javaType": "String",
"deprecated": false, "deprecationNote": "", "autowired": false, "secret":
false, "description": "The CQL query to execute.", "constantName":
"org.apache.camel.component.cassandra.CassandraConstants#CQL_QUERY" },
+ "CamelCqlResumeQuery": { "kind": "header", "displayName": "", "group":
"consumer", "label": "consumer", "required": false, "javaType": "String",
"deprecated": false, "deprecationNote": "", "autowired": false, "secret":
false, "description": "The CQL query to execute when resuming.",
"constantName":
"org.apache.camel.component.cassandra.CassandraConstants#CASSANDRA_RESUME_ACTION"
}
},
"properties": {
"beanRef": { "kind": "path", "displayName": "Bean Ref", "group": "common",
"label": "", "required": false, "type": "string", "javaType":
"java.lang.String", "deprecated": false, "autowired": false, "secret": false,
"description": "beanRef is defined using bean:id" },
diff --git
a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConstants.java
b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConstants.java
index 6dc155250c2..83323ddcf80 100644
---
a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConstants.java
+++
b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConstants.java
@@ -29,6 +29,9 @@ public final class CassandraConstants {
@Metadata(label = "producer", description = "The CQL query to execute.",
javaType = "String")
public static final String CQL_QUERY = "CamelCqlQuery";
+ @Metadata(label = "consumer", description = "The CQL query to execute when
resuming.", javaType = "String")
+ public static final String CASSANDRA_RESUME_ACTION = "CamelCqlResumeQuery";
+
private CassandraConstants() {
}
diff --git
a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java
b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java
index f3bb3bf971e..9d396d4bd6b 100644
---
a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java
+++
b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java
@@ -22,10 +22,14 @@ import com.datastax.oss.driver.api.core.cql.ResultSet;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
+import
org.apache.camel.component.cassandra.consumer.support.CassandraResumeAction;
import
org.apache.camel.component.cassandra.consumer.support.CassandraResumeAdapter;
import org.apache.camel.resume.ResumeAware;
import org.apache.camel.resume.ResumeStrategy;
import org.apache.camel.support.ScheduledPollConsumer;
+import org.apache.camel.util.ObjectHelper;
+
+import static
org.apache.camel.component.cassandra.CassandraConstants.CASSANDRA_RESUME_ACTION;
/**
* Cassandra 2 CQL3 consumer.
@@ -79,20 +83,25 @@ public class CassandraConsumer extends
ScheduledPollConsumer implements ResumeAw
@Override
protected void doStart() throws Exception {
- super.doStart();
if (isPrepareStatements()) {
preparedStatement = getEndpoint().prepareStatement();
}
if (resumeStrategy != null) {
- CqlSession session = getEndpoint().getSessionHolder().getSession();
+ resumeStrategy.loadCache();
CassandraResumeAdapter resumeAdapter =
resumeStrategy.getAdapter(CassandraResumeAdapter.class);
if (resumeAdapter != null) {
- resumeAdapter.setSession(session);
+ CassandraResumeAction action = (CassandraResumeAction)
getEndpoint().getCamelContext().getRegistry()
+ .lookupByName(CASSANDRA_RESUME_ACTION);
+ ObjectHelper.notNull(action, "The resume action cannot be
null", this);
+
+ resumeAdapter.setResumeAction(action);
resumeAdapter.resume();
}
}
+
+ super.doStart();
}
@Override
diff --git
a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/consumer/support/CassandraResumeAdapter.java
b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/consumer/support/CassandraResumeAction.java
similarity index 70%
copy from
components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/consumer/support/CassandraResumeAdapter.java
copy to
components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/consumer/support/CassandraResumeAction.java
index e8ccbed8efe..73c691ee5a5 100644
---
a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/consumer/support/CassandraResumeAdapter.java
+++
b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/consumer/support/CassandraResumeAction.java
@@ -17,18 +17,18 @@
package org.apache.camel.component.cassandra.consumer.support;
-import com.datastax.oss.driver.api.core.CqlSession;
-import org.apache.camel.resume.ResumeAdapter;
-
/**
- * Provides a resume adapter for Cassandra consumers
+ * Provides and interface for integrations to run actions during resume
*/
-public interface CassandraResumeAdapter extends ResumeAdapter {
+public interface CassandraResumeAction {
/**
- * Sets the session that allow implementations to run a one-time query on
the DB
+ * Runs an action on an resumable (entry)
*
- * @param session
+ * @param key the resumable key
+ * @param value the resumable value
+ *
+ * @return true if the entry addressed should be invalidated or
false otherwise
*/
- void setSession(CqlSession session);
+ boolean evalEntry(Object key, Object value);
}
diff --git
a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/consumer/support/CassandraResumeAdapter.java
b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/consumer/support/CassandraResumeAdapter.java
index e8ccbed8efe..7de5f5958c9 100644
---
a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/consumer/support/CassandraResumeAdapter.java
+++
b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/consumer/support/CassandraResumeAdapter.java
@@ -17,7 +17,6 @@
package org.apache.camel.component.cassandra.consumer.support;
-import com.datastax.oss.driver.api.core.CqlSession;
import org.apache.camel.resume.ResumeAdapter;
/**
@@ -26,9 +25,9 @@ import org.apache.camel.resume.ResumeAdapter;
public interface CassandraResumeAdapter extends ResumeAdapter {
/**
- * Sets the session that allow implementations to run a one-time query on
the DB
- *
- * @param session
+ * Sets an action that will be executed during resume
+ *
+ * @param resumeAction the action to execute during resume
*/
- void setSession(CqlSession session);
+ void setResumeAction(CassandraResumeAction resumeAction);
}
diff --git
a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/consumer/support/DefaultCassandraResumeAdapter.java
b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/consumer/support/DefaultCassandraResumeAdapter.java
new file mode 100644
index 00000000000..62a3dab489f
--- /dev/null
+++
b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/consumer/support/DefaultCassandraResumeAdapter.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.cassandra.consumer.support;
+
+import java.nio.ByteBuffer;
+
+import org.apache.camel.resume.Cacheable;
+import org.apache.camel.resume.Deserializable;
+import org.apache.camel.resume.Offset;
+import org.apache.camel.resume.OffsetKey;
+import org.apache.camel.resume.cache.ResumeCache;
+
+public class DefaultCassandraResumeAdapter implements CassandraResumeAdapter,
Cacheable, Deserializable {
+ private ResumeCache<Object> cache;
+ private CassandraResumeAction resumeAction;
+
+ @Override
+ public void setResumeAction(CassandraResumeAction resumeAction) {
+ this.resumeAction = resumeAction;
+ }
+
+ @Override
+ public void resume() {
+ cache.forEach(resumeAction::evalEntry);
+ }
+
+ private boolean add(Object key, Object offset) {
+ cache.add(key, offset);
+
+ return true;
+ }
+
+ @Override
+ public boolean add(OffsetKey<?> key, Offset<?> offset) {
+ return add(key.getValue(), offset.getValue());
+ }
+
+ @Override
+ public void setCache(ResumeCache<?> cache) {
+ this.cache = (ResumeCache<Object>) cache;
+ }
+
+ @Override
+ public ResumeCache<?> getCache() {
+ return cache;
+ }
+
+ @Override
+ public boolean deserialize(ByteBuffer keyBuffer, ByteBuffer valueBuffer) {
+ Object key = deserializeObject(keyBuffer);
+ Object value = deserializeObject(valueBuffer);
+
+ return add(key, value);
+ }
+}
diff --git
a/components/camel-cassandraql/src/main/resources/org/apache/camel/resume/adapter.properties
b/components/camel-cassandraql/src/main/resources/org/apache/camel/resume/adapter.properties
new file mode 100644
index 00000000000..59898d3e1ac
--- /dev/null
+++
b/components/camel-cassandraql/src/main/resources/org/apache/camel/resume/adapter.properties
@@ -0,0 +1,19 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+
+adapterClass=org.apache.camel.component.cassandra.consumer.support.DefaultCassandraResumeAdapter
diff --git
a/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/integration/CassandraComponentResumeStrategyIT.java
b/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/integration/CassandraComponentResumeStrategyIT.java
index b41a2a2ef33..ee8647202c0 100644
---
a/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/integration/CassandraComponentResumeStrategyIT.java
+++
b/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/integration/CassandraComponentResumeStrategyIT.java
@@ -20,46 +20,33 @@ package org.apache.camel.component.cassandra.integration;
import java.util.List;
import java.util.concurrent.TimeUnit;
-import com.datastax.oss.driver.api.core.CqlSession;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
+import
org.apache.camel.component.cassandra.consumer.support.CassandraResumeAction;
import
org.apache.camel.component.cassandra.consumer.support.CassandraResumeAdapter;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.processor.resume.TransientResumeStrategy;
import org.junit.jupiter.api.Test;
+import static
org.apache.camel.component.cassandra.CassandraConstants.CASSANDRA_RESUME_ACTION;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class CassandraComponentResumeStrategyIT extends BaseCassandra {
private static class TestCassandraResumeAdapter implements
CassandraResumeAdapter {
- private boolean sessionCalled;
- private boolean sessionNotNull;
private boolean resumeCalled;
+ private boolean resumeActionNotNull;
@Override
- public void setSession(CqlSession session) {
- sessionCalled = true;
- sessionNotNull = session != null;
+ public void setResumeAction(CassandraResumeAction action) {
+ resumeActionNotNull = action != null;
}
@Override
public void resume() {
resumeCalled = true;
}
-
- public boolean isSessionCalled() {
- return sessionCalled;
- }
-
- public boolean isSessionNotNull() {
- return sessionNotNull;
- }
-
- public boolean isResumeCalled() {
- return resumeCalled;
- }
}
private static final String CQL = "select login, first_name, last_name
from camel_user";
@@ -79,15 +66,16 @@ public class CassandraComponentResumeStrategyIT extends
BaseCassandra {
mock.await(1, TimeUnit.SECONDS);
assertMockEndpointsSatisfied();
- assertTrue(resumeStrategy.isSessionCalled());
- assertTrue(resumeStrategy.isSessionNotNull());
- assertTrue(resumeStrategy.isResumeCalled());
+ assertTrue(resumeStrategy.resumeActionNotNull);
+ assertTrue(resumeStrategy.resumeCalled);
}
@Override
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
public void configure() {
+ bindToRegistry(CASSANDRA_RESUME_ACTION,
(CassandraResumeAction) (key, value) -> true);
+
fromF("cql://%s/%s?cql=%s", getUrl(), KEYSPACE_NAME, CQL)
.resumable(new TransientResumeStrategy(resumeStrategy))
.to("mock:resultAll");
diff --git
a/core/camel-api/src/main/java/org/apache/camel/resume/cache/ResumeCache.java
b/core/camel-api/src/main/java/org/apache/camel/resume/cache/ResumeCache.java
index 090492b32b4..9791287579e 100644
---
a/core/camel-api/src/main/java/org/apache/camel/resume/cache/ResumeCache.java
+++
b/core/camel-api/src/main/java/org/apache/camel/resume/cache/ResumeCache.java
@@ -17,6 +17,7 @@
package org.apache.camel.resume.cache;
+import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;
@@ -94,4 +95,10 @@ public interface ResumeCache<K> {
* @return the offset value
*/
Object get(K key);
+
+ /**
+ * Performs the given action for each member of the cache
+ * @param action the action to execute
+ */
+ void forEach(BiFunction<? super K, ? super Object, Boolean> action);
}