This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
new 0679942fb3 NIFI-12306 ConsumeAzureEventHub logs partition ownership
changes at info level
0679942fb3 is described below
commit 0679942fb3013188b0e86f0a39741b4b674bad3f
Author: Peter Turcsanyi <[email protected]>
AuthorDate: Wed Nov 1 17:09:57 2023 +0100
NIFI-12306 ConsumeAzureEventHub logs partition ownership changes at info
level
This closes #7970
Signed-off-by: David Handermann <[email protected]>
(cherry picked from commit 3e8db53f8822d98c007d9d95b10a7d3944d8124e)
---
.../azure/eventhub/ConsumeAzureEventHub.java | 19 ++++++++++++++++++-
1 file changed, 18 insertions(+), 1 deletion(-)
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java
index 29170f938a..5d7d9e87cd 100644
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java
@@ -17,6 +17,8 @@
package org.apache.nifi.processors.azure.eventhub;
import com.azure.core.amqp.AmqpTransportType;
+import com.azure.core.amqp.exception.AmqpErrorCondition;
+import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.credential.AzureNamedKeyCredential;
import com.azure.identity.ManagedIdentityCredential;
import com.azure.identity.ManagedIdentityCredentialBuilder;
@@ -514,12 +516,27 @@ public class ConsumeAzureEventHub extends
AbstractSessionFactoryProcessor implem
private final Consumer<ErrorContext> errorProcessor = errorContext -> {
final PartitionContext partitionContext =
errorContext.getPartitionContext();
+ final Throwable throwable = errorContext.getThrowable();
+
+ if (throwable instanceof AmqpException) {
+ final AmqpException amqpException = (AmqpException) throwable;
+ if (amqpException.getErrorCondition() ==
AmqpErrorCondition.LINK_STOLEN) {
+ getLogger().info("Partition was stolen by another consumer
instance from the consumer group. Namespace [{}] Event Hub [{}] Consumer Group
[{}] Partition [{}]. {}",
+ partitionContext.getFullyQualifiedNamespace(),
+ partitionContext.getEventHubName(),
+ partitionContext.getConsumerGroup(),
+ partitionContext.getPartitionId(),
+ amqpException.getMessage());
+ return;
+ }
+ }
+
getLogger().error("Receive Events failed Namespace [{}] Event Hub [{}]
Consumer Group [{}] Partition [{}]",
partitionContext.getFullyQualifiedNamespace(),
partitionContext.getEventHubName(),
partitionContext.getConsumerGroup(),
partitionContext.getPartitionId(),
- errorContext.getThrowable()
+ throwable
);
};