This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 01ec3d9980cbe0f00c41a71e9583e032f250986f
Author: Kévin LACIRE <[email protected]>
AuthorDate: Wed Mar 23 17:40:19 2022 +0100

    add checkpoint batch size & batch timeout parameters on 
camel-azure-eventhubs component
---
 .../component/azure/eventhubs/azure-eventhubs.json |  6 ++-
 .../azure/eventhubs/EventHubsConfiguration.java    | 19 +++++++++
 .../azure/eventhubs/EventHubsConstants.java        |  4 ++
 .../azure/eventhubs/EventHubsConsumer.java         | 46 +++++++++++++++++++++-
 .../azure/eventhubs/EventHubsComponentTest.java    |  5 ++-
 5 files changed, 76 insertions(+), 4 deletions(-)

diff --git 
a/components/camel-azure/camel-azure-eventhubs/src/generated/resources/org/apache/camel/component/azure/eventhubs/azure-eventhubs.json
 
b/components/camel-azure/camel-azure-eventhubs/src/generated/resources/org/apache/camel/component/azure/eventhubs/azure-eventhubs.json
index 3a672a2648a..229e2a712c1 100644
--- 
a/components/camel-azure/camel-azure-eventhubs/src/generated/resources/org/apache/camel/component/azure/eventhubs/azure-eventhubs.json
+++ 
b/components/camel-azure/camel-azure-eventhubs/src/generated/resources/org/apache/camel/component/azure/eventhubs/azure-eventhubs.json
@@ -41,7 +41,8 @@
     "autowiredEnabled": { "kind": "property", "displayName": "Autowired 
Enabled", "group": "advanced", "label": "advanced", "required": false, "type": 
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": true, "description": "Whether autowiring is 
enabled. This is used for automatic autowiring options (the option must be 
marked as autowired) by looking up in the registry to find if there is a single 
instance of matching type, which t [...]
     "connectionString": { "kind": "property", "displayName": "Connection 
String", "group": "security", "label": "security", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "autowired": 
false, "secret": true, "configurationClass": 
"org.apache.camel.component.azure.eventhubs.EventHubsConfiguration", 
"configurationField": "configuration", "description": "Instead of supplying 
namespace, sharedAccessKey, sharedAccessName ... etc, you can just supply t 
[...]
     "sharedAccessKey": { "kind": "property", "displayName": "Shared Access 
Key", "group": "security", "label": "security", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "autowired": 
false, "secret": true, "configurationClass": 
"org.apache.camel.component.azure.eventhubs.EventHubsConfiguration", 
"configurationField": "configuration", "description": "The generated value for 
the SharedAccessName" },
-    "sharedAccessName": { "kind": "property", "displayName": "Shared Access 
Name", "group": "security", "label": "security", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "autowired": 
false, "secret": false, "configurationClass": 
"org.apache.camel.component.azure.eventhubs.EventHubsConfiguration", 
"configurationField": "configuration", "description": "The name you chose for 
your EventHubs SAS keys" }
+    "sharedAccessName": { "kind": "property", "displayName": "Shared Access 
Name", "group": "security", "label": "security", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "autowired": 
false, "secret": false, "configurationClass": 
"org.apache.camel.component.azure.eventhubs.EventHubsConfiguration", 
"configurationField": "configuration", "description": "The name you chose for 
your EventHubs SAS keys" },
+    "checkpointBatchSize": { "kind": "property", "displayName": "Checkpoint 
Batch Size", "group": "consumer", "label": "consumer", "required": false, 
"type": "integer", "javaType": "int", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": 500, "configurationClass": 
"org.apache.camel.component.azure.eventhubs.EventHubsConfiguration", 
"configurationField": "configuration", "description": "Sets the count used by 
the receiver to control the number of events the Event H [...]
   },
   "headers": {
     "CamelAzureEventHubsPartitionKey": { "kind": "header", "displayName": "", 
"group": "common", "label": "", "required": false, "javaType": "String", 
"deprecated": false, "deprecationNote": "", "autowired": false, "secret": 
false, "description": "(producer) Overrides the hashing key to be provided for 
the batch of events, which instructs the Event Hubs service to map this key to 
a specific partition. (consumer) It sets the partition hashing key if it was 
set when originally publishing t [...]
@@ -74,6 +75,7 @@
     "lazyStartProducer": { "kind": "parameter", "displayName": "Lazy Start 
Producer", "group": "producer (advanced)", "label": "producer,advanced", 
"required": false, "type": "boolean", "javaType": "boolean", "deprecated": 
false, "autowired": false, "secret": false, "defaultValue": false, 
"description": "Whether the producer should be started lazy (on the first 
message). By starting lazy you can use this to allow CamelContext and routes to 
startup in situations where a producer may other [...]
     "connectionString": { "kind": "parameter", "displayName": "Connection 
String", "group": "security", "label": "security", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "autowired": 
false, "secret": true, "configurationClass": 
"org.apache.camel.component.azure.eventhubs.EventHubsConfiguration", 
"configurationField": "configuration", "description": "Instead of supplying 
namespace, sharedAccessKey, sharedAccessName ... etc, you can just supply  [...]
     "sharedAccessKey": { "kind": "parameter", "displayName": "Shared Access 
Key", "group": "security", "label": "security", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "autowired": 
false, "secret": true, "configurationClass": 
"org.apache.camel.component.azure.eventhubs.EventHubsConfiguration", 
"configurationField": "configuration", "description": "The generated value for 
the SharedAccessName" },
-    "sharedAccessName": { "kind": "parameter", "displayName": "Shared Access 
Name", "group": "security", "label": "security", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "autowired": 
false, "secret": false, "configurationClass": 
"org.apache.camel.component.azure.eventhubs.EventHubsConfiguration", 
"configurationField": "configuration", "description": "The name you chose for 
your EventHubs SAS keys" }
+    "sharedAccessName": { "kind": "parameter", "displayName": "Shared Access 
Name", "group": "security", "label": "security", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "autowired": 
false, "secret": false, "configurationClass": 
"org.apache.camel.component.azure.eventhubs.EventHubsConfiguration", 
"configurationField": "configuration", "description": "The name you chose for 
your EventHubs SAS keys" },
+    "checkpointBatchSize": { "kind": "parameter", "displayName": "Checkpoint 
Batch Size", "group": "consumer", "label": "consumer", "required": false, 
"type": "integer", "javaType": "int", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": 500, "configurationClass": 
"org.apache.camel.component.azure.eventhubs.EventHubsConfiguration", 
"configurationField": "configuration", "description": "Sets the count used by 
the receiver to control the number of events the Event  [...]
   }
 }
diff --git 
a/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConfiguration.java
 
b/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConfiguration.java
index 6a4b24fdbf0..7018de1d1ca 100644
--- 
a/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConfiguration.java
+++ 
b/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConfiguration.java
@@ -66,6 +66,10 @@ public class EventHubsConfiguration implements Cloneable {
     private StorageSharedKeyCredential blobStorageSharedKeyCredential;
     @UriParam(label = "consumer")
     private Map<String, EventPosition> eventPosition = new HashMap<>();
+    @UriParam(label = "consumer", defaultValue = "500")
+    private int checkpointBatchSize = 500;
+    @UriParam(label = "consumer", defaultValue = "5000")
+    private int checkpointBatchTimeout = 5000;
     @UriParam(label = "producer")
     @Metadata(autowired = true)
     private EventHubProducerAsyncClient producerAsyncClient;
@@ -311,6 +315,21 @@ public class EventHubsConfiguration implements Cloneable {
         this.eventPosition = eventPosition;
     }
 
+    public int getCheckpointBatchSize() {
+        return checkpointBatchSize;
+    }
+
+    public void setCheckpointBatchSize(int checkpointBatchSize) {
+        this.checkpointBatchSize = checkpointBatchSize;
+    }
+
+    public int getCheckpointBatchTimeout() {
+        return checkpointBatchTimeout;
+    }
+
+    public void setCheckpointBatchTimeout(int checkpointBatchTimeout) {
+        this.checkpointBatchTimeout = checkpointBatchTimeout;
+    }
     // *************************************************
     //
     // *************************************************
diff --git 
a/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConstants.java
 
b/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConstants.java
index 889377dbe22..b746c12d076 100644
--- 
a/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConstants.java
+++ 
b/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConstants.java
@@ -55,6 +55,10 @@ public final class EventHubsConstants {
     public static final String METADATA = HEADER_PREFIX + "Metadata";
     @Metadata(label = "consumer", description = "The timestamp of the 
message", javaType = "long")
     public static final String MESSAGE_TIMESTAMP = Exchange.MESSAGE_TIMESTAMP;
+    @Metadata(label = "consumer",
+              description = "It sets the reason for the checkpoint to have 
been updated. This is only present on a received `EventData`.",
+              javaType = "Boolean")
+    public static final String CHECKPOINT_UPDATED_BY = HEADER_PREFIX + 
"CheckpointUpdatedBy";
 
     private EventHubsConstants() {
     }
diff --git 
a/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java
 
b/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java
index ef3230383b1..917131c33d9 100644
--- 
a/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java
+++ 
b/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.component.azure.eventhubs;
 
+import java.util.concurrent.atomic.AtomicInteger;
+
 import com.azure.messaging.eventhubs.EventProcessorClient;
 import com.azure.messaging.eventhubs.models.ErrorContext;
 import com.azure.messaging.eventhubs.models.EventContext;
@@ -34,9 +36,16 @@ public class EventHubsConsumer extends DefaultConsumer {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(EventHubsConsumer.class);
 
+    private static final String COMPLETED_BY_SIZE = "size";
+    private static final String COMPLETED_BY_TIMEOUT = "timeout";
+
     // we use the EventProcessorClient as recommended by Azure docs to consume 
from all partitions
     private EventProcessorClient processorClient;
 
+    private AtomicInteger processedEvents;
+
+    private Long checkpointCompletionTimeout;
+
     public EventHubsConsumer(final EventHubsEndpoint endpoint, final Processor 
processor) {
         super(endpoint, processor);
     }
@@ -45,6 +54,9 @@ public class EventHubsConsumer extends DefaultConsumer {
     protected void doStart() throws Exception {
         super.doStart();
 
+        checkpointCompletionTimeout = System.currentTimeMillis();
+        processedEvents = new AtomicInteger(0);
+
         // create the client
         processorClient = 
EventHubsClientFactory.createEventProcessorClient(getConfiguration(),
                 this::onEventListener, this::onErrorListener);
@@ -146,7 +158,12 @@ public class EventHubsConsumer extends DefaultConsumer {
      */
     private void processCommit(final Exchange exchange, final EventContext 
eventContext) {
         try {
-            eventContext.updateCheckpoint();
+            if (processCheckpoint(exchange, eventContext)) {
+                eventContext.updateCheckpoint();
+                processedEvents.set(0);
+            } else {
+                processedEvents.incrementAndGet();
+            }
         } catch (Exception ex) {
             getExceptionHandler().handleException("Error occurred during 
updating the checkpoint. This exception is ignored.",
                     exchange, ex);
@@ -164,4 +181,31 @@ public class EventHubsConsumer extends DefaultConsumer {
             getExceptionHandler().handleException("Error during processing 
exchange.", exchange, cause);
         }
     }
+
+    /**
+     * Checks whether we need to update the checkpoint or not
+     *
+     * @param  exchange the exchange
+     *
+     * @return          true if at least one of the two conditions (batch size 
or batch timeout) are met, else false
+     */
+    private boolean processCheckpoint(Exchange exchange, EventContext 
eventContext) {
+        var checkpointByBatchSize = processedEvents.get() % 
getEndpoint().getConfiguration().getCheckpointBatchSize() == 0;
+        if (checkpointByBatchSize) {
+            
exchange.getIn().setHeader(EventHubsConstants.CHECKPOINT_UPDATED_BY, 
COMPLETED_BY_SIZE);
+        }
+
+        var now = System.currentTimeMillis();
+        var checkpointByBatchTimeout = false;
+        if (now >= checkpointCompletionTimeout + 
getEndpoint().getConfiguration().getCheckpointBatchTimeout()) {
+            checkpointCompletionTimeout = now;
+            checkpointByBatchTimeout = true;
+        }
+
+        if (checkpointByBatchTimeout) {
+            
exchange.getIn().setHeader(EventHubsConstants.CHECKPOINT_UPDATED_BY, 
COMPLETED_BY_TIMEOUT);
+        }
+
+        return checkpointByBatchSize || checkpointByBatchTimeout;
+    }
 }
diff --git 
a/components/camel-azure/camel-azure-eventhubs/src/test/java/org/apache/camel/component/azure/eventhubs/EventHubsComponentTest.java
 
b/components/camel-azure/camel-azure-eventhubs/src/test/java/org/apache/camel/component/azure/eventhubs/EventHubsComponentTest.java
index 1f6b7f9124e..fa2f4e17872 100644
--- 
a/components/camel-azure/camel-azure-eventhubs/src/test/java/org/apache/camel/component/azure/eventhubs/EventHubsComponentTest.java
+++ 
b/components/camel-azure/camel-azure-eventhubs/src/test/java/org/apache/camel/component/azure/eventhubs/EventHubsComponentTest.java
@@ -111,7 +111,8 @@ class EventHubsComponentTest extends CamelTestSupport {
     public void testCreateEndpointWithConfig() {
         final String uri = 
"azure-eventhubs:namespace/hubName?sharedAccessName=DummyAccessKeyName"
                            + "&sharedAccessKey=DummyKey"
-                           + 
"&consumerGroupName=testConsumer&prefetchCount=100";
+                           + 
"&consumerGroupName=testConsumer&prefetchCount=100"
+                           + 
"&checkpointBatchSize=100&checkpointBatchTimeout=1000";
 
         final EventHubsEndpoint endpoint = (EventHubsEndpoint) 
context.getEndpoint(uri);
 
@@ -121,6 +122,8 @@ class EventHubsComponentTest extends CamelTestSupport {
         assertEquals("DummyAccessKeyName", 
endpoint.getConfiguration().getSharedAccessName());
         assertEquals("DummyKey", 
endpoint.getConfiguration().getSharedAccessKey());
         assertEquals(100, endpoint.getConfiguration().getPrefetchCount());
+        assertEquals(100, 
endpoint.getConfiguration().getCheckpointBatchSize());
+        assertEquals(1000, 
endpoint.getConfiguration().getCheckpointBatchTimeout());
     }
 
     private String getErrorMessage(final String uri) {

Reply via email to