This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit abd1350f1da60d1fa02295f27b8828978a12e61d Author: Kévin Lacire <[email protected]> AuthorDate: Fri Apr 8 15:32:04 2022 +0200 :sparkles: use of timer & task to handle timeout batch's completion condition --- .../EventHubsCheckpointUpdaterTimerTask.java | 54 +++++++++++++++++ .../azure/eventhubs/EventHubsConstants.java | 13 +++- .../azure/eventhubs/EventHubsConsumer.java | 69 ++++++++++++++-------- 3 files changed, 111 insertions(+), 25 deletions(-) diff --git a/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsCheckpointUpdaterTimerTask.java b/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsCheckpointUpdaterTimerTask.java new file mode 100644 index 00000000000..19c14c05e75 --- /dev/null +++ b/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsCheckpointUpdaterTimerTask.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.azure.eventhubs; + +import java.util.TimerTask; +import java.util.concurrent.atomic.AtomicInteger; + +import com.azure.messaging.eventhubs.models.EventContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class EventHubsCheckpointUpdaterTimerTask extends TimerTask { + + private static final Logger LOG = LoggerFactory.getLogger(EventHubsCheckpointUpdaterTimerTask.class); + + private EventContext eventContext; + private final AtomicInteger processedEvents; + + public EventHubsCheckpointUpdaterTimerTask(EventContext eventContext, AtomicInteger processedEvents) { + super(); + + this.eventContext = eventContext; + this.processedEvents = processedEvents; + } + + @Override + public void run() { + if (processedEvents.get() > 0) { + LOG.debug("checkpointing offset after reaching timeout, with a batch of {}", processedEvents.get()); + eventContext.updateCheckpoint(); + processedEvents.set(0); + } else { + LOG.debug("skip checkpointing offset even if timeout is reached. No events processed"); + } + } + + public void setEventContext(EventContext eventContext) { + this.eventContext = eventContext; + } +} diff --git a/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConstants.java b/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConstants.java index b746c12d076..f1f07ea567a 100644 --- a/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConstants.java +++ b/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConstants.java @@ -57,9 +57,20 @@ public final class EventHubsConstants { public static final String MESSAGE_TIMESTAMP = Exchange.MESSAGE_TIMESTAMP; @Metadata(label = "consumer", description = "It sets the reason for the checkpoint to have been updated. This is only present on a received `EventData`.", - javaType = "Boolean") + javaType = "String") public static final String CHECKPOINT_UPDATED_BY = HEADER_PREFIX + "CheckpointUpdatedBy"; + @Metadata(label = "consumer", + description = "Completion condition header's value for the checkpoint to be updated when batch size is reached", + javaType = "String") + public static final String COMPLETED_BY_SIZE = "size"; + + @Metadata(label = "consumer", + description = "Completion condition header's value for the checkpoint to be updated when batch timeout is reached", + javaType = "String") + public static final String COMPLETED_BY_TIMEOUT = "timeout"; + public static final String UNCOMPLETED = "uncompleted"; + private EventHubsConstants() { } } diff --git a/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java b/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java index 917131c33d9..22616c705f2 100644 --- a/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java +++ b/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java @@ -16,6 +16,7 @@ */ package org.apache.camel.component.azure.eventhubs; +import java.util.Timer; import java.util.concurrent.atomic.AtomicInteger; import com.azure.messaging.eventhubs.EventProcessorClient; @@ -32,31 +33,31 @@ import org.apache.camel.support.DefaultConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.camel.component.azure.eventhubs.EventHubsConstants.*; + public class EventHubsConsumer extends DefaultConsumer { private static final Logger LOG = LoggerFactory.getLogger(EventHubsConsumer.class); - private static final String COMPLETED_BY_SIZE = "size"; - private static final String COMPLETED_BY_TIMEOUT = "timeout"; - // we use the EventProcessorClient as recommended by Azure docs to consume from all partitions private EventProcessorClient processorClient; - private AtomicInteger processedEvents; + private final AtomicInteger processedEvents; + private final Timer timer; - private Long checkpointCompletionTimeout; + private EventHubsCheckpointUpdaterTimerTask lastTask; public EventHubsConsumer(final EventHubsEndpoint endpoint, final Processor processor) { super(endpoint, processor); + + this.processedEvents = new AtomicInteger(0); + this.timer = new Timer(); } @Override protected void doStart() throws Exception { super.doStart(); - checkpointCompletionTimeout = System.currentTimeMillis(); - processedEvents = new AtomicInteger(0); - // create the client processorClient = EventHubsClientFactory.createEventProcessorClient(getConfiguration(), this::onEventListener, this::onErrorListener); @@ -157,13 +158,25 @@ public class EventHubsConsumer extends DefaultConsumer { * @param exchange the exchange */ private void processCommit(final Exchange exchange, final EventContext eventContext) { + if (lastTask == null || System.currentTimeMillis() > lastTask.scheduledExecutionTime()) { + lastTask = new EventHubsCheckpointUpdaterTimerTask(eventContext, processedEvents); + // delegate the checkpoint update to a dedicated Thread + timer.schedule(lastTask, getConfiguration().getCheckpointBatchTimeout()); + } else { + // updates the eventContext to use for the offset to be the most accurate + lastTask.setEventContext(eventContext); + } + try { - if (processCheckpoint(exchange, eventContext)) { + var completionCondition = processCheckpoint(exchange); + if (completionCondition.equals(COMPLETED_BY_SIZE)) { eventContext.updateCheckpoint(); processedEvents.set(0); - } else { + } else if (!completionCondition.equals(COMPLETED_BY_TIMEOUT)) { processedEvents.incrementAndGet(); } + // we assume that the timer task has done the update by its side + } catch (Exception ex) { getExceptionHandler().handleException("Error occurred during updating the checkpoint. This exception is ignored.", exchange, ex); @@ -183,29 +196,37 @@ public class EventHubsConsumer extends DefaultConsumer { } /** - * Checks whether we need to update the checkpoint or not + * Checks either the batch size or the batch timeout is reached * * @param exchange the exchange - * - * @return true if at least one of the two conditions (batch size or batch timeout) are met, else false + * @return the completion condition (batch size or batch timeout) if one of them is reached, else the + * 'uncompleted' state adds a header {@value EventHubsConstants#CHECKPOINT_UPDATED_BY} with the + * completion condition ( */ - private boolean processCheckpoint(Exchange exchange, EventContext eventContext) { - var checkpointByBatchSize = processedEvents.get() % getEndpoint().getConfiguration().getCheckpointBatchSize() == 0; - if (checkpointByBatchSize) { + private String processCheckpoint(Exchange exchange) { + // Check if the batch size is reached + if (processedEvents.get() % getConfiguration().getCheckpointBatchSize() == 0) { exchange.getIn().setHeader(EventHubsConstants.CHECKPOINT_UPDATED_BY, COMPLETED_BY_SIZE); - } + LOG.debug("eventhub consumer batch size of reached"); + // no need to run task if the batch size already did the checkpointing + if (lastTask != null) { + lastTask.cancel(); + } - var now = System.currentTimeMillis(); - var checkpointByBatchTimeout = false; - if (now >= checkpointCompletionTimeout + getEndpoint().getConfiguration().getCheckpointBatchTimeout()) { - checkpointCompletionTimeout = now; - checkpointByBatchTimeout = true; + return COMPLETED_BY_SIZE; + } else { + LOG.debug("eventhub consumer batch size of {}/{} not reached yet", processedEvents.get(), + getConfiguration().getCheckpointBatchSize()); } - if (checkpointByBatchTimeout) { + // Check if the batch timeout is reached + if (System.currentTimeMillis() >= lastTask.scheduledExecutionTime()) { exchange.getIn().setHeader(EventHubsConstants.CHECKPOINT_UPDATED_BY, COMPLETED_BY_TIMEOUT); + LOG.debug("eventhub consumer batch timeout reached"); + + return COMPLETED_BY_TIMEOUT; } - return checkpointByBatchSize || checkpointByBatchTimeout; + return UNCOMPLETED; } }
