This is an automated email from the ASF dual-hosted git repository.
oalsafi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 06ec6ab Add fix for header override by Azure Storage Blob download
consumer (#4949)
06ec6ab is described below
commit 06ec6abfd9dfa16e875af8cd0cb6c85bdc2ea318
Author: Mark Andreev <[email protected]>
AuthorDate: Fri Jan 29 18:52:40 2021 +0300
Add fix for header override by Azure Storage Blob download consumer (#4949)
---
.../component/azure/storage/queue/QueueConsumer.java | 15 +++++++++++++++
1 file changed, 15 insertions(+)
diff --git
a/components/camel-azure-storage-queue/src/main/java/org/apache/camel/component/azure/storage/queue/QueueConsumer.java
b/components/camel-azure-storage-queue/src/main/java/org/apache/camel/component/azure/storage/queue/QueueConsumer.java
index fad58ec..f391e0a 100644
---
a/components/camel-azure-storage-queue/src/main/java/org/apache/camel/component/azure/storage/queue/QueueConsumer.java
+++
b/components/camel-azure-storage-queue/src/main/java/org/apache/camel/component/azure/storage/queue/QueueConsumer.java
@@ -16,6 +16,7 @@
*/
package org.apache.camel.component.azure.storage.queue;
+import java.time.Duration;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
@@ -111,10 +112,24 @@ public class QueueConsumer extends
ScheduledBatchPollingConsumer {
// update pending number of exchanges
pendingExchanges = total - index - 1;
+ // copy messageId, popReceipt, timeout for fix exchange override
case
+ // azure storage blob can override this headers
+ final String messageId = exchange.getIn()
+ .getHeader(QueueConstants.MESSAGE_ID, String.class);
+ final String popReceipt = exchange.getIn()
+ .getHeader(QueueConstants.POP_RECEIPT, String.class);
+ final Duration timeout = exchange.getIn()
+ .getHeader(QueueConstants.TIMEOUT, Duration.class);
+
// add on completion to handle after work when the exchange is done
exchange.adapt(ExtendedExchange.class).addOnCompletion(new
Synchronization() {
@Override
public void onComplete(Exchange exchange) {
+ // past messageId, popReceipt, timeout for fix exchange
override case
+ exchange.getIn().setHeader(QueueConstants.MESSAGE_ID,
messageId);
+ exchange.getIn().setHeader(QueueConstants.POP_RECEIPT,
popReceipt);
+ exchange.getIn().setHeader(QueueConstants.TIMEOUT,
timeout);
+
processCommit(exchange);
}