This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit 01ec3d9980cbe0f00c41a71e9583e032f250986f Author: Kévin LACIRE <[email protected]> AuthorDate: Wed Mar 23 17:40:19 2022 +0100 add checkpoint batch size & batch timeout parameters on camel-azure-eventhubs component --- .../component/azure/eventhubs/azure-eventhubs.json | 6 ++- .../azure/eventhubs/EventHubsConfiguration.java | 19 +++++++++ .../azure/eventhubs/EventHubsConstants.java | 4 ++ .../azure/eventhubs/EventHubsConsumer.java | 46 +++++++++++++++++++++- .../azure/eventhubs/EventHubsComponentTest.java | 5 ++- 5 files changed, 76 insertions(+), 4 deletions(-) diff --git a/components/camel-azure/camel-azure-eventhubs/src/generated/resources/org/apache/camel/component/azure/eventhubs/azure-eventhubs.json b/components/camel-azure/camel-azure-eventhubs/src/generated/resources/org/apache/camel/component/azure/eventhubs/azure-eventhubs.json index 3a672a2648a..229e2a712c1 100644 --- a/components/camel-azure/camel-azure-eventhubs/src/generated/resources/org/apache/camel/component/azure/eventhubs/azure-eventhubs.json +++ b/components/camel-azure/camel-azure-eventhubs/src/generated/resources/org/apache/camel/component/azure/eventhubs/azure-eventhubs.json @@ -41,7 +41,8 @@ "autowiredEnabled": { "kind": "property", "displayName": "Autowired Enabled", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which t [...] "connectionString": { "kind": "property", "displayName": "Connection String", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": true, "configurationClass": "org.apache.camel.component.azure.eventhubs.EventHubsConfiguration", "configurationField": "configuration", "description": "Instead of supplying namespace, sharedAccessKey, sharedAccessName ... etc, you can just supply t [...] "sharedAccessKey": { "kind": "property", "displayName": "Shared Access Key", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": true, "configurationClass": "org.apache.camel.component.azure.eventhubs.EventHubsConfiguration", "configurationField": "configuration", "description": "The generated value for the SharedAccessName" }, - "sharedAccessName": { "kind": "property", "displayName": "Shared Access Name", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.azure.eventhubs.EventHubsConfiguration", "configurationField": "configuration", "description": "The name you chose for your EventHubs SAS keys" } + "sharedAccessName": { "kind": "property", "displayName": "Shared Access Name", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.azure.eventhubs.EventHubsConfiguration", "configurationField": "configuration", "description": "The name you chose for your EventHubs SAS keys" }, + "checkpointBatchSize": { "kind": "property", "displayName": "Checkpoint Batch Size", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 500, "configurationClass": "org.apache.camel.component.azure.eventhubs.EventHubsConfiguration", "configurationField": "configuration", "description": "Sets the count used by the receiver to control the number of events the Event H [...] }, "headers": { "CamelAzureEventHubsPartitionKey": { "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "(producer) Overrides the hashing key to be provided for the batch of events, which instructs the Event Hubs service to map this key to a specific partition. (consumer) It sets the partition hashing key if it was set when originally publishing t [...] @@ -74,6 +75,7 @@ "lazyStartProducer": { "kind": "parameter", "displayName": "Lazy Start Producer", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may other [...] "connectionString": { "kind": "parameter", "displayName": "Connection String", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": true, "configurationClass": "org.apache.camel.component.azure.eventhubs.EventHubsConfiguration", "configurationField": "configuration", "description": "Instead of supplying namespace, sharedAccessKey, sharedAccessName ... etc, you can just supply [...] "sharedAccessKey": { "kind": "parameter", "displayName": "Shared Access Key", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": true, "configurationClass": "org.apache.camel.component.azure.eventhubs.EventHubsConfiguration", "configurationField": "configuration", "description": "The generated value for the SharedAccessName" }, - "sharedAccessName": { "kind": "parameter", "displayName": "Shared Access Name", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.azure.eventhubs.EventHubsConfiguration", "configurationField": "configuration", "description": "The name you chose for your EventHubs SAS keys" } + "sharedAccessName": { "kind": "parameter", "displayName": "Shared Access Name", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.azure.eventhubs.EventHubsConfiguration", "configurationField": "configuration", "description": "The name you chose for your EventHubs SAS keys" }, + "checkpointBatchSize": { "kind": "parameter", "displayName": "Checkpoint Batch Size", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 500, "configurationClass": "org.apache.camel.component.azure.eventhubs.EventHubsConfiguration", "configurationField": "configuration", "description": "Sets the count used by the receiver to control the number of events the Event [...] } } diff --git a/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConfiguration.java b/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConfiguration.java index 6a4b24fdbf0..7018de1d1ca 100644 --- a/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConfiguration.java +++ b/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConfiguration.java @@ -66,6 +66,10 @@ public class EventHubsConfiguration implements Cloneable { private StorageSharedKeyCredential blobStorageSharedKeyCredential; @UriParam(label = "consumer") private Map<String, EventPosition> eventPosition = new HashMap<>(); + @UriParam(label = "consumer", defaultValue = "500") + private int checkpointBatchSize = 500; + @UriParam(label = "consumer", defaultValue = "5000") + private int checkpointBatchTimeout = 5000; @UriParam(label = "producer") @Metadata(autowired = true) private EventHubProducerAsyncClient producerAsyncClient; @@ -311,6 +315,21 @@ public class EventHubsConfiguration implements Cloneable { this.eventPosition = eventPosition; } + public int getCheckpointBatchSize() { + return checkpointBatchSize; + } + + public void setCheckpointBatchSize(int checkpointBatchSize) { + this.checkpointBatchSize = checkpointBatchSize; + } + + public int getCheckpointBatchTimeout() { + return checkpointBatchTimeout; + } + + public void setCheckpointBatchTimeout(int checkpointBatchTimeout) { + this.checkpointBatchTimeout = checkpointBatchTimeout; + } // ************************************************* // // ************************************************* diff --git a/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConstants.java b/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConstants.java index 889377dbe22..b746c12d076 100644 --- a/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConstants.java +++ b/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConstants.java @@ -55,6 +55,10 @@ public final class EventHubsConstants { public static final String METADATA = HEADER_PREFIX + "Metadata"; @Metadata(label = "consumer", description = "The timestamp of the message", javaType = "long") public static final String MESSAGE_TIMESTAMP = Exchange.MESSAGE_TIMESTAMP; + @Metadata(label = "consumer", + description = "It sets the reason for the checkpoint to have been updated. This is only present on a received `EventData`.", + javaType = "Boolean") + public static final String CHECKPOINT_UPDATED_BY = HEADER_PREFIX + "CheckpointUpdatedBy"; private EventHubsConstants() { } diff --git a/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java b/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java index ef3230383b1..917131c33d9 100644 --- a/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java +++ b/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java @@ -16,6 +16,8 @@ */ package org.apache.camel.component.azure.eventhubs; +import java.util.concurrent.atomic.AtomicInteger; + import com.azure.messaging.eventhubs.EventProcessorClient; import com.azure.messaging.eventhubs.models.ErrorContext; import com.azure.messaging.eventhubs.models.EventContext; @@ -34,9 +36,16 @@ public class EventHubsConsumer extends DefaultConsumer { private static final Logger LOG = LoggerFactory.getLogger(EventHubsConsumer.class); + private static final String COMPLETED_BY_SIZE = "size"; + private static final String COMPLETED_BY_TIMEOUT = "timeout"; + // we use the EventProcessorClient as recommended by Azure docs to consume from all partitions private EventProcessorClient processorClient; + private AtomicInteger processedEvents; + + private Long checkpointCompletionTimeout; + public EventHubsConsumer(final EventHubsEndpoint endpoint, final Processor processor) { super(endpoint, processor); } @@ -45,6 +54,9 @@ public class EventHubsConsumer extends DefaultConsumer { protected void doStart() throws Exception { super.doStart(); + checkpointCompletionTimeout = System.currentTimeMillis(); + processedEvents = new AtomicInteger(0); + // create the client processorClient = EventHubsClientFactory.createEventProcessorClient(getConfiguration(), this::onEventListener, this::onErrorListener); @@ -146,7 +158,12 @@ public class EventHubsConsumer extends DefaultConsumer { */ private void processCommit(final Exchange exchange, final EventContext eventContext) { try { - eventContext.updateCheckpoint(); + if (processCheckpoint(exchange, eventContext)) { + eventContext.updateCheckpoint(); + processedEvents.set(0); + } else { + processedEvents.incrementAndGet(); + } } catch (Exception ex) { getExceptionHandler().handleException("Error occurred during updating the checkpoint. This exception is ignored.", exchange, ex); @@ -164,4 +181,31 @@ public class EventHubsConsumer extends DefaultConsumer { getExceptionHandler().handleException("Error during processing exchange.", exchange, cause); } } + + /** + * Checks whether we need to update the checkpoint or not + * + * @param exchange the exchange + * + * @return true if at least one of the two conditions (batch size or batch timeout) are met, else false + */ + private boolean processCheckpoint(Exchange exchange, EventContext eventContext) { + var checkpointByBatchSize = processedEvents.get() % getEndpoint().getConfiguration().getCheckpointBatchSize() == 0; + if (checkpointByBatchSize) { + exchange.getIn().setHeader(EventHubsConstants.CHECKPOINT_UPDATED_BY, COMPLETED_BY_SIZE); + } + + var now = System.currentTimeMillis(); + var checkpointByBatchTimeout = false; + if (now >= checkpointCompletionTimeout + getEndpoint().getConfiguration().getCheckpointBatchTimeout()) { + checkpointCompletionTimeout = now; + checkpointByBatchTimeout = true; + } + + if (checkpointByBatchTimeout) { + exchange.getIn().setHeader(EventHubsConstants.CHECKPOINT_UPDATED_BY, COMPLETED_BY_TIMEOUT); + } + + return checkpointByBatchSize || checkpointByBatchTimeout; + } } diff --git a/components/camel-azure/camel-azure-eventhubs/src/test/java/org/apache/camel/component/azure/eventhubs/EventHubsComponentTest.java b/components/camel-azure/camel-azure-eventhubs/src/test/java/org/apache/camel/component/azure/eventhubs/EventHubsComponentTest.java index 1f6b7f9124e..fa2f4e17872 100644 --- a/components/camel-azure/camel-azure-eventhubs/src/test/java/org/apache/camel/component/azure/eventhubs/EventHubsComponentTest.java +++ b/components/camel-azure/camel-azure-eventhubs/src/test/java/org/apache/camel/component/azure/eventhubs/EventHubsComponentTest.java @@ -111,7 +111,8 @@ class EventHubsComponentTest extends CamelTestSupport { public void testCreateEndpointWithConfig() { final String uri = "azure-eventhubs:namespace/hubName?sharedAccessName=DummyAccessKeyName" + "&sharedAccessKey=DummyKey" - + "&consumerGroupName=testConsumer&prefetchCount=100"; + + "&consumerGroupName=testConsumer&prefetchCount=100" + + "&checkpointBatchSize=100&checkpointBatchTimeout=1000"; final EventHubsEndpoint endpoint = (EventHubsEndpoint) context.getEndpoint(uri); @@ -121,6 +122,8 @@ class EventHubsComponentTest extends CamelTestSupport { assertEquals("DummyAccessKeyName", endpoint.getConfiguration().getSharedAccessName()); assertEquals("DummyKey", endpoint.getConfiguration().getSharedAccessKey()); assertEquals(100, endpoint.getConfiguration().getPrefetchCount()); + assertEquals(100, endpoint.getConfiguration().getCheckpointBatchSize()); + assertEquals(1000, endpoint.getConfiguration().getCheckpointBatchTimeout()); } private String getErrorMessage(final String uri) {
