This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit abd1350f1da60d1fa02295f27b8828978a12e61d
Author: Kévin Lacire <[email protected]>
AuthorDate: Fri Apr 8 15:32:04 2022 +0200

    :sparkles: use of timer & task to handle timeout batch's completion 
condition
---
 .../EventHubsCheckpointUpdaterTimerTask.java       | 54 +++++++++++++++++
 .../azure/eventhubs/EventHubsConstants.java        | 13 +++-
 .../azure/eventhubs/EventHubsConsumer.java         | 69 ++++++++++++++--------
 3 files changed, 111 insertions(+), 25 deletions(-)

diff --git 
a/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsCheckpointUpdaterTimerTask.java
 
b/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsCheckpointUpdaterTimerTask.java
new file mode 100644
index 00000000000..19c14c05e75
--- /dev/null
+++ 
b/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsCheckpointUpdaterTimerTask.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.azure.eventhubs;
+
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.azure.messaging.eventhubs.models.EventContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EventHubsCheckpointUpdaterTimerTask extends TimerTask {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(EventHubsCheckpointUpdaterTimerTask.class);
+
+    private EventContext eventContext;
+    private final AtomicInteger processedEvents;
+
+    public EventHubsCheckpointUpdaterTimerTask(EventContext eventContext, 
AtomicInteger processedEvents) {
+        super();
+
+        this.eventContext = eventContext;
+        this.processedEvents = processedEvents;
+    }
+
+    @Override
+    public void run() {
+        if (processedEvents.get() > 0) {
+            LOG.debug("checkpointing offset after reaching timeout, with a 
batch of {}", processedEvents.get());
+            eventContext.updateCheckpoint();
+            processedEvents.set(0);
+        } else {
+            LOG.debug("skip checkpointing offset even if timeout is reached. 
No events processed");
+        }
+    }
+
+    public void setEventContext(EventContext eventContext) {
+        this.eventContext = eventContext;
+    }
+}
diff --git 
a/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConstants.java
 
b/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConstants.java
index b746c12d076..f1f07ea567a 100644
--- 
a/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConstants.java
+++ 
b/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConstants.java
@@ -57,9 +57,20 @@ public final class EventHubsConstants {
     public static final String MESSAGE_TIMESTAMP = Exchange.MESSAGE_TIMESTAMP;
     @Metadata(label = "consumer",
               description = "It sets the reason for the checkpoint to have 
been updated. This is only present on a received `EventData`.",
-              javaType = "Boolean")
+              javaType = "String")
     public static final String CHECKPOINT_UPDATED_BY = HEADER_PREFIX + 
"CheckpointUpdatedBy";
 
+    @Metadata(label = "consumer",
+              description = "Completion condition header's value for the 
checkpoint to be updated when batch size is reached",
+              javaType = "String")
+    public static final String COMPLETED_BY_SIZE = "size";
+
+    @Metadata(label = "consumer",
+              description = "Completion condition header's value for the 
checkpoint to be updated when batch timeout is reached",
+              javaType = "String")
+    public static final String COMPLETED_BY_TIMEOUT = "timeout";
+    public static final String UNCOMPLETED = "uncompleted";
+
     private EventHubsConstants() {
     }
 }
diff --git 
a/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java
 
b/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java
index 917131c33d9..22616c705f2 100644
--- 
a/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java
+++ 
b/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.azure.eventhubs;
 
+import java.util.Timer;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import com.azure.messaging.eventhubs.EventProcessorClient;
@@ -32,31 +33,31 @@ import org.apache.camel.support.DefaultConsumer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.camel.component.azure.eventhubs.EventHubsConstants.*;
+
 public class EventHubsConsumer extends DefaultConsumer {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(EventHubsConsumer.class);
 
-    private static final String COMPLETED_BY_SIZE = "size";
-    private static final String COMPLETED_BY_TIMEOUT = "timeout";
-
     // we use the EventProcessorClient as recommended by Azure docs to consume 
from all partitions
     private EventProcessorClient processorClient;
 
-    private AtomicInteger processedEvents;
+    private final AtomicInteger processedEvents;
+    private final Timer timer;
 
-    private Long checkpointCompletionTimeout;
+    private EventHubsCheckpointUpdaterTimerTask lastTask;
 
     public EventHubsConsumer(final EventHubsEndpoint endpoint, final Processor 
processor) {
         super(endpoint, processor);
+
+        this.processedEvents = new AtomicInteger(0);
+        this.timer = new Timer();
     }
 
     @Override
     protected void doStart() throws Exception {
         super.doStart();
 
-        checkpointCompletionTimeout = System.currentTimeMillis();
-        processedEvents = new AtomicInteger(0);
-
         // create the client
         processorClient = 
EventHubsClientFactory.createEventProcessorClient(getConfiguration(),
                 this::onEventListener, this::onErrorListener);
@@ -157,13 +158,25 @@ public class EventHubsConsumer extends DefaultConsumer {
      * @param exchange the exchange
      */
     private void processCommit(final Exchange exchange, final EventContext 
eventContext) {
+        if (lastTask == null || System.currentTimeMillis() > 
lastTask.scheduledExecutionTime()) {
+            lastTask = new EventHubsCheckpointUpdaterTimerTask(eventContext, 
processedEvents);
+            // delegate the checkpoint update to a dedicated Thread
+            timer.schedule(lastTask, 
getConfiguration().getCheckpointBatchTimeout());
+        } else {
+            // updates the eventContext to use for the offset to be the most 
accurate
+            lastTask.setEventContext(eventContext);
+        }
+
         try {
-            if (processCheckpoint(exchange, eventContext)) {
+            var completionCondition = processCheckpoint(exchange);
+            if (completionCondition.equals(COMPLETED_BY_SIZE)) {
                 eventContext.updateCheckpoint();
                 processedEvents.set(0);
-            } else {
+            } else if (!completionCondition.equals(COMPLETED_BY_TIMEOUT)) {
                 processedEvents.incrementAndGet();
             }
+            // we assume that the timer task has done the update by its side
+
         } catch (Exception ex) {
             getExceptionHandler().handleException("Error occurred during 
updating the checkpoint. This exception is ignored.",
                     exchange, ex);
@@ -183,29 +196,37 @@ public class EventHubsConsumer extends DefaultConsumer {
     }
 
     /**
-     * Checks whether we need to update the checkpoint or not
+     * Checks either the batch size or the batch timeout is reached
      *
      * @param  exchange the exchange
-     *
-     * @return          true if at least one of the two conditions (batch size 
or batch timeout) are met, else false
+     * @return          the completion condition (batch size or batch timeout) 
if one of them is reached, else the
+     *                  'uncompleted' state adds a header {@value 
EventHubsConstants#CHECKPOINT_UPDATED_BY} with the
+     *                  completion condition (
      */
-    private boolean processCheckpoint(Exchange exchange, EventContext 
eventContext) {
-        var checkpointByBatchSize = processedEvents.get() % 
getEndpoint().getConfiguration().getCheckpointBatchSize() == 0;
-        if (checkpointByBatchSize) {
+    private String processCheckpoint(Exchange exchange) {
+        // Check if the batch size is reached
+        if (processedEvents.get() % 
getConfiguration().getCheckpointBatchSize() == 0) {
             
exchange.getIn().setHeader(EventHubsConstants.CHECKPOINT_UPDATED_BY, 
COMPLETED_BY_SIZE);
-        }
+            LOG.debug("eventhub consumer batch size of reached");
+            // no need to run task if the batch size already did the 
checkpointing
+            if (lastTask != null) {
+                lastTask.cancel();
+            }
 
-        var now = System.currentTimeMillis();
-        var checkpointByBatchTimeout = false;
-        if (now >= checkpointCompletionTimeout + 
getEndpoint().getConfiguration().getCheckpointBatchTimeout()) {
-            checkpointCompletionTimeout = now;
-            checkpointByBatchTimeout = true;
+            return COMPLETED_BY_SIZE;
+        } else {
+            LOG.debug("eventhub consumer batch size of {}/{} not reached yet", 
processedEvents.get(),
+                    getConfiguration().getCheckpointBatchSize());
         }
 
-        if (checkpointByBatchTimeout) {
+        // Check if the batch timeout is reached
+        if (System.currentTimeMillis() >= lastTask.scheduledExecutionTime()) {
             
exchange.getIn().setHeader(EventHubsConstants.CHECKPOINT_UPDATED_BY, 
COMPLETED_BY_TIMEOUT);
+            LOG.debug("eventhub consumer batch timeout reached");
+
+            return COMPLETED_BY_TIMEOUT;
         }
 
-        return checkpointByBatchSize || checkpointByBatchTimeout;
+        return UNCOMPLETED;
     }
 }

Reply via email to