This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new fb591d073d8 CAMEL-17963: added resume support for Couchbase
fb591d073d8 is described below

commit fb591d073d86f4ca331af482b399f11792b81e49
Author: Otavio Rodolfo Piske <[email protected]>
AuthorDate: Mon Apr 25 16:21:24 2022 +0200

    CAMEL-17963: added resume support for Couchbase
---
 components/camel-couchbase/pom.xml                 | 10 ++-
 .../component/couchbase/CouchbaseConsumer.java     | 22 ++++-
 .../couchbase/CouchbaseResumeStrategy.java         | 44 ++++++++++
 .../integration/ConsumeResumeStrategyIT.java       | 95 ++++++++++++++++++++++
 .../docs/modules/eips/pages/resume-strategies.adoc |  1 +
 5 files changed, 168 insertions(+), 4 deletions(-)

diff --git a/components/camel-couchbase/pom.xml 
b/components/camel-couchbase/pom.xml
index c6f1023a28c..1540f4c28f9 100644
--- a/components/camel-couchbase/pom.xml
+++ b/components/camel-couchbase/pom.xml
@@ -31,9 +31,6 @@
     <name>Camel :: Couchbase</name>
     <description>Camel Couchbase component</description>
 
-    <properties>
-    </properties>
-
     <dependencies>
 
         <dependency>
@@ -70,6 +67,12 @@
             <scope>test</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.awaitility</groupId>
+            <artifactId>awaitility</artifactId>
+            <scope>test</scope>
+        </dependency>
+
         <!-- test infra -->
         <dependency>
             <groupId>org.apache.camel</groupId>
@@ -78,5 +81,6 @@
             <type>test-jar</type>
             <scope>test</scope>
         </dependency>
+
     </dependencies>
 </project>
diff --git 
a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java
 
b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java
index e3a899e573c..1ac3bbefb05 100644
--- 
a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java
+++ 
b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java
@@ -26,6 +26,7 @@ import com.couchbase.client.java.view.ViewResult;
 import com.couchbase.client.java.view.ViewRow;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
+import org.apache.camel.ResumeAware;
 import org.apache.camel.support.DefaultScheduledPollConsumer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,7 +36,7 @@ import static 
org.apache.camel.component.couchbase.CouchbaseConstants.HEADER_ID;
 import static 
org.apache.camel.component.couchbase.CouchbaseConstants.HEADER_KEY;
 import static 
org.apache.camel.component.couchbase.CouchbaseConstants.HEADER_VIEWNAME;
 
-public class CouchbaseConsumer extends DefaultScheduledPollConsumer {
+public class CouchbaseConsumer extends DefaultScheduledPollConsumer implements 
ResumeAware<CouchbaseResumeStrategy> {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(CouchbaseConsumer.class);
 
@@ -44,6 +45,8 @@ public class CouchbaseConsumer extends 
DefaultScheduledPollConsumer {
     private final Collection collection;
     private ViewOptions viewOptions;
 
+    private CouchbaseResumeStrategy resumeStrategy;
+
     public CouchbaseConsumer(CouchbaseEndpoint endpoint, Bucket client, 
Processor processor) {
         super(endpoint, processor);
         this.bucket = client;
@@ -90,6 +93,13 @@ public class CouchbaseConsumer extends 
DefaultScheduledPollConsumer {
     @Override
     protected void doStart() throws Exception {
         super.doStart();
+
+        if (resumeStrategy != null) {
+            LOG.info("Couchbase consumer running with resume strategy 
enabled");
+            resumeStrategy.setBucket(bucket);
+
+            resumeStrategy.resume();
+        }
     }
 
     @Override
@@ -170,4 +180,14 @@ public class CouchbaseConsumer extends 
DefaultScheduledPollConsumer {
         }
 
     }
+
+    @Override
+    public CouchbaseResumeStrategy getResumeStrategy() {
+        return resumeStrategy;
+    }
+
+    @Override
+    public void setResumeStrategy(CouchbaseResumeStrategy resumeStrategy) {
+        this.resumeStrategy = resumeStrategy;
+    }
 }
diff --git 
a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseResumeStrategy.java
 
b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseResumeStrategy.java
new file mode 100644
index 00000000000..6e90080b8b8
--- /dev/null
+++ 
b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseResumeStrategy.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.couchbase;
+
+import com.couchbase.client.java.Bucket;
+import org.apache.camel.ResumeStrategy;
+
+/**
+ * Allow implementing resume strategies for couchbase consumers
+ */
+public interface CouchbaseResumeStrategy extends ResumeStrategy {
+
+    @Override
+    default void start() {
+
+    }
+
+    @Override
+    default void stop() {
+
+    }
+
+    /**
+     * Sets the bucket in use
+     * 
+     * @param bucket the bucket in use
+     */
+    void setBucket(Bucket bucket);
+}
diff --git 
a/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/integration/ConsumeResumeStrategyIT.java
 
b/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/integration/ConsumeResumeStrategyIT.java
new file mode 100644
index 00000000000..ac7616a879e
--- /dev/null
+++ 
b/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/integration/ConsumeResumeStrategyIT.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.couchbase.integration;
+
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+
+import com.couchbase.client.java.Bucket;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.couchbase.CouchbaseResumeStrategy;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.resume.Resumables;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import static org.awaitility.Awaitility.await;
+
+public class ConsumeResumeStrategyIT extends CouchbaseIntegrationTestBase {
+    static class TestResumeStrategy implements CouchbaseResumeStrategy {
+        volatile boolean setBucketCalled;
+        volatile boolean bucketNotNull;
+        volatile boolean resumeCalled;
+
+        @Override
+        public void setBucket(Bucket bucket) {
+            setBucketCalled = true;
+            bucketNotNull = bucket != null;
+        }
+
+        @Override
+        public void resume() {
+            resumeCalled = true;
+        }
+    }
+
+    TestResumeStrategy resumeStrategy = new TestResumeStrategy();
+
+    @Test
+    public void testQueryForBeers() throws Exception {
+        for (int i = 0; i < 15; i++) {
+            
cluster.bucket(bucketName).defaultCollection().upsert("DocumentID_" + i, 
"message" + i);
+        }
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(10);
+
+        assertMockEndpointsSatisfied(30, TimeUnit.SECONDS);
+
+        await().atMost(30, TimeUnit.SECONDS)
+                .untilAsserted(() -> 
Assertions.assertTrue(resumeStrategy.setBucketCalled,
+                        "The setBucket method should have been called"));
+        await().atMost(3, TimeUnit.SECONDS)
+                .untilAsserted(() -> 
Assertions.assertTrue(resumeStrategy.bucketNotNull,
+                        "The input bucket should not have been null"));
+        await().atMost(3, TimeUnit.SECONDS)
+                .untilAsserted(
+                        () -> 
Assertions.assertTrue(resumeStrategy.resumeCalled, "The resume method should 
have been called"));
+    }
+
+    @AfterEach
+    public void cleanBucket() {
+        cluster.buckets().flushBucket(bucketName);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                
from(String.format("%s&designDocumentName=%s&viewName=%s&limit=10", 
getConnectionUri(), bucketName, bucketName))
+                        .resumable().resumeStrategy(resumeStrategy)
+                        .setHeader(Exchange.OFFSET,
+                                constant(Resumables.of("key", 
ThreadLocalRandom.current().nextInt(1, 1000))))
+                        .log("message received")
+                        .to("mock:result");
+            }
+        };
+
+    }
+}
diff --git 
a/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc
 
b/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc
index c83b37d5aab..8176f7d6a52 100644
--- 
a/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc
+++ 
b/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc
@@ -14,6 +14,7 @@ Support for resume varies according to the component. 
Initially, the support is
 * xref:components::atom-component.adoc[camel-atom]
 * xref:components::aws2-kinesis-component.adoc[camel-aws2-kinesis]
 * xref:components::cql-component.adoc[camel-cassandracql]
+* xref:components::couchbase-component.adoc[camel-couchbase]
 * xref:components::couchdb-component.adoc[camel-couchdb]
 * xref:components::file-component.adoc[camel-file]
 * xref:components::kafka-component.adoc[camel-kafka]

Reply via email to