This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 3e8db53f88 NIFI-12306 ConsumeAzureEventHub logs partition ownership 
changes at info level
3e8db53f88 is described below

commit 3e8db53f8822d98c007d9d95b10a7d3944d8124e
Author: Peter Turcsanyi <[email protected]>
AuthorDate: Wed Nov 1 17:09:57 2023 +0100

    NIFI-12306 ConsumeAzureEventHub logs partition ownership changes at info 
level
    
    This closes #7970
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../azure/eventhub/ConsumeAzureEventHub.java          | 19 ++++++++++++++++++-
 1 file changed, 18 insertions(+), 1 deletion(-)

diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java
index a3aef2affd..33b80f0daa 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java
@@ -17,6 +17,8 @@
 package org.apache.nifi.processors.azure.eventhub;
 
 import com.azure.core.amqp.AmqpTransportType;
+import com.azure.core.amqp.exception.AmqpErrorCondition;
+import com.azure.core.amqp.exception.AmqpException;
 import com.azure.core.credential.AzureNamedKeyCredential;
 import com.azure.identity.ManagedIdentityCredential;
 import com.azure.identity.ManagedIdentityCredentialBuilder;
@@ -505,12 +507,27 @@ public class ConsumeAzureEventHub extends 
AbstractSessionFactoryProcessor implem
 
     private final Consumer<ErrorContext> errorProcessor = errorContext -> {
         final PartitionContext partitionContext = 
errorContext.getPartitionContext();
+        final Throwable throwable = errorContext.getThrowable();
+
+        if (throwable instanceof AmqpException) {
+            final AmqpException amqpException = (AmqpException) throwable;
+            if (amqpException.getErrorCondition() == 
AmqpErrorCondition.LINK_STOLEN) {
+                getLogger().info("Partition was stolen by another consumer 
instance from the consumer group. Namespace [{}] Event Hub [{}] Consumer Group 
[{}] Partition [{}]. {}",
+                        partitionContext.getFullyQualifiedNamespace(),
+                        partitionContext.getEventHubName(),
+                        partitionContext.getConsumerGroup(),
+                        partitionContext.getPartitionId(),
+                        amqpException.getMessage());
+                return;
+            }
+        }
+
         getLogger().error("Receive Events failed Namespace [{}] Event Hub [{}] 
Consumer Group [{}] Partition [{}]",
                 partitionContext.getFullyQualifiedNamespace(),
                 partitionContext.getEventHubName(),
                 partitionContext.getConsumerGroup(),
                 partitionContext.getPartitionId(),
-                errorContext.getThrowable()
+                throwable
         );
     };
 

Reply via email to