This is an automated email from the ASF dual-hosted git repository.
turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new dbc627e0b0 NIFI-11261 Added Primary Node State handling to
GetAzureEventHub
dbc627e0b0 is described below
commit dbc627e0b0b6602231955153474943c3763cc448
Author: exceptionfactory <[email protected]>
AuthorDate: Wed Mar 8 14:53:52 2023 -0600
NIFI-11261 Added Primary Node State handling to GetAzureEventHub
- Updated Qpid Proton J from 0.34.0 to 0.34.1
This closes #7023.
Signed-off-by: Peter Turcsanyi <[email protected]>
---
.../azure/eventhub/GetAzureEventHub.java | 98 ++++++++++++++++++++--
.../azure/eventhub/GetAzureEventHubTest.java | 49 ++++++++++-
nifi-nar-bundles/nifi-azure-bundle/pom.xml | 2 +-
3 files changed, 140 insertions(+), 9 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
index d9f00839d3..855b531d92 100644
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
@@ -23,13 +23,16 @@ import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Optional;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import com.azure.core.amqp.AmqpClientOptions;
import com.azure.core.credential.AzureNamedKeyCredential;
import com.azure.identity.ManagedIdentityCredential;
import com.azure.identity.ManagedIdentityCredentialBuilder;
@@ -49,10 +52,13 @@ import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
+import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.NodeTypeProvider;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
@@ -61,6 +67,7 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.scheduling.ExecutionNode;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.processors.azure.eventhub.utils.AzureEventHubUtils;
@@ -85,6 +92,8 @@ public class GetAzureEventHub extends AbstractProcessor {
private static final Duration DEFAULT_FETCH_TIMEOUT =
Duration.ofSeconds(60);
private static final int DEFAULT_FETCH_SIZE = 100;
+ private static final String NODE_CLIENT_IDENTIFIER_FORMAT = "%s-%s";
+
static final PropertyDescriptor EVENT_HUB_NAME = new
PropertyDescriptor.Builder()
.name("Event Hub Name")
.description("Name of Azure Event Hubs source")
@@ -180,10 +189,16 @@ public class GetAzureEventHub extends AbstractProcessor {
private final Map<String, EventPosition> partitionEventPositions = new
ConcurrentHashMap<>();
- private volatile BlockingQueue<String> partitionIds = new
LinkedBlockingQueue<>();
+ private final BlockingQueue<String> partitionIds = new
LinkedBlockingQueue<>();
+
+ private final AtomicReference<ExecutionNode> configuredExecutionNode = new
AtomicReference<>(ExecutionNode.ALL);
+
private volatile int receiverFetchSize;
+
private volatile Duration receiverFetchTimeout;
+ private EventHubClientBuilder configuredClientBuilder;
+
private EventHubConsumerClient eventHubConsumerClient;
@Override
@@ -201,20 +216,40 @@ public class GetAzureEventHub extends AbstractProcessor {
return AzureEventHubUtils.customValidate(ACCESS_POLICY,
POLICY_PRIMARY_KEY, context);
}
+ @OnPrimaryNodeStateChange
+ public void onPrimaryNodeStateChange(final PrimaryNodeState
primaryNodeState) {
+ final ExecutionNode executionNode = configuredExecutionNode.get();
+ if (executionNode == ExecutionNode.PRIMARY) {
+ if (PrimaryNodeState.PRIMARY_NODE_REVOKED == primaryNodeState) {
+ closeClient();
+ getLogger().info("Consumer Client closed based on Execution
Node [{}] and Primary Node State [{}]", executionNode, primaryNodeState);
+ } else {
+ createClient();
+ getLogger().info("Consumer Client created based on Execution
Node [{}] and Primary Node State [{}]", executionNode, primaryNodeState);
+ }
+ } else {
+ getLogger().debug("Consumer Client not changed based on Execution
Node [{}]", executionNode);
+ }
+ }
+
@OnStopped
public void closeClient() {
+ partitionIds.clear();
partitionEventPositions.clear();
if (eventHubConsumerClient == null) {
- getLogger().info("Azure Event Hub Consumer Client not configured");
+ getLogger().debug("Consumer Client not configured");
} else {
eventHubConsumerClient.close();
+ getLogger().info("Consumer Client for Event Hub [{}] closed",
eventHubConsumerClient.getEventHubName());
}
}
@OnScheduled
public void onScheduled(final ProcessContext context) {
- eventHubConsumerClient = createEventHubConsumerClient(context);
+ configuredExecutionNode.set(context.getExecutionNode());
+ configuredClientBuilder = createEventHubClientBuilder(context);
+ createClient();
if (context.getProperty(RECEIVER_FETCH_SIZE).isSet()) {
receiverFetchSize =
context.getProperty(RECEIVER_FETCH_SIZE).asInteger();
@@ -227,8 +262,6 @@ public class GetAzureEventHub extends AbstractProcessor {
receiverFetchTimeout = DEFAULT_FETCH_TIMEOUT;
}
- this.partitionIds = getPartitionIds();
-
final PropertyValue enqueuedTimeProperty =
context.getProperty(ENQUEUE_TIME);
final Instant initialEnqueuedTime;
if (enqueuedTimeProperty.isSet()) {
@@ -310,7 +343,30 @@ public class GetAzureEventHub extends AbstractProcessor {
return eventHubConsumerClient.receiveFromPartition(partitionId,
receiverFetchSize, eventPosition, receiverFetchTimeout);
}
- private EventHubConsumerClient createEventHubConsumerClient(final
ProcessContext context) {
+ private void createClient() {
+ if (isCreateClientEnabled()) {
+ closeClient();
+ eventHubConsumerClient =
configuredClientBuilder.buildConsumerClient();
+ partitionIds.addAll(getPartitionIds());
+ getLogger().info("Consumer Client created for Event Hub [{}]
Partitions {}", eventHubConsumerClient.getEventHubName(), partitionIds);
+ }
+ }
+
+ private boolean isCreateClientEnabled() {
+ final boolean enabled;
+
+ final ExecutionNode executionNode = configuredExecutionNode.get();
+ if (ExecutionNode.PRIMARY == executionNode) {
+ final NodeTypeProvider nodeTypeProvider = getNodeTypeProvider();
+ enabled = nodeTypeProvider.isPrimary();
+ } else {
+ enabled = true;
+ }
+
+ return enabled;
+ }
+
+ private EventHubClientBuilder createEventHubClientBuilder(final
ProcessContext context) {
final String namespace = context.getProperty(NAMESPACE).getValue();
final String eventHubName =
context.getProperty(EVENT_HUB_NAME).getValue();
final String serviceBusEndpoint =
context.getProperty(SERVICE_BUS_ENDPOINT).getValue();
@@ -332,7 +388,14 @@ public class GetAzureEventHub extends AbstractProcessor {
final AzureNamedKeyCredential azureNamedKeyCredential = new
AzureNamedKeyCredential(policyName, policyKey);
eventHubClientBuilder.credential(fullyQualifiedNamespace,
eventHubName, azureNamedKeyCredential);
}
- return eventHubClientBuilder.buildConsumerClient();
+
+ // Set Azure Event Hub Client Identifier using Processor Identifier
instead of default random UUID
+ final AmqpClientOptions clientOptions = new AmqpClientOptions();
+ final String clientIdentifier = getClientIdentifier();
+ clientOptions.setIdentifier(clientIdentifier);
+ eventHubClientBuilder.clientOptions(clientOptions);
+
+ return eventHubClientBuilder;
}
private String getTransitUri(final String partitionId) {
@@ -344,6 +407,27 @@ public class GetAzureEventHub extends AbstractProcessor {
);
}
+ private String getClientIdentifier() {
+ final String clientIdentifier;
+
+ final String componentIdentifier = getIdentifier();
+
+ final NodeTypeProvider nodeTypeProvider = getNodeTypeProvider();
+ if (nodeTypeProvider.isClustered()) {
+ final Optional<String> currentNode =
nodeTypeProvider.getCurrentNode();
+ if (currentNode.isPresent()) {
+ final String currentNodeId = currentNode.get();
+ clientIdentifier =
String.format(NODE_CLIENT_IDENTIFIER_FORMAT, currentNodeId,
componentIdentifier);
+ } else {
+ clientIdentifier = componentIdentifier;
+ }
+ } else {
+ clientIdentifier = componentIdentifier;
+ }
+
+ return clientIdentifier;
+ }
+
private Map<String, String> getAttributes(final PartitionEvent
partitionEvent) {
final Map<String, String> attributes = new LinkedHashMap<>();
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
index a46aa54558..e40d714111 100644
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
@@ -20,6 +20,9 @@ import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.models.LastEnqueuedEventProperties;
import com.azure.messaging.eventhubs.models.PartitionContext;
import com.azure.messaging.eventhubs.models.PartitionEvent;
+import org.apache.nifi.annotation.notification.PrimaryNodeState;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.scheduling.ExecutionNode;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@@ -33,8 +36,11 @@ import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
public class GetAzureEventHubTest {
- private static final String DOMAIN_NAME = "servicebus";
+ private static final String DOMAIN_NAME = "DOMAIN";
private static final String EVENT_HUB_NAMESPACE = "NAMESPACE";
private static final String EVENT_HUB_NAME = "NAME";
private static final String POLICY_NAME = "POLICY";
@@ -111,6 +117,47 @@ public class GetAzureEventHubTest {
flowFile.assertAttributeEquals("eventhub.name", EVENT_HUB_NAME);
}
+ @Test
+ public void testPrimaryNodeRevoked() {
+ setProperties();
+
+ final ProcessContext processContext =
spy(testRunner.getProcessContext());
+
when(processContext.getExecutionNode()).thenReturn(ExecutionNode.PRIMARY);
+
+ testRunner.setIsConfiguredForClustering(true);
+ testRunner.setPrimaryNode(true);
+ final GetAzureEventHub processor = (GetAzureEventHub)
testRunner.getProcessor();
+ processor.onScheduled(processContext);
+
processor.onPrimaryNodeStateChange(PrimaryNodeState.PRIMARY_NODE_REVOKED);
+
+ final PartitionEvent partitionEvent = createPartitionEvent();
+ partitionEvents.add(partitionEvent);
+
+ testRunner.run(1, true, false);
+ testRunner.assertAllFlowFilesTransferred(GetAzureEventHub.REL_SUCCESS,
0);
+ }
+
+ @Test
+ public void testPrimaryNodeRevokedThenElected() {
+ setProperties();
+
+ final ProcessContext processContext =
spy(testRunner.getProcessContext());
+
when(processContext.getExecutionNode()).thenReturn(ExecutionNode.PRIMARY);
+
+ testRunner.setIsConfiguredForClustering(true);
+ testRunner.setPrimaryNode(true);
+ final GetAzureEventHub processor = (GetAzureEventHub)
testRunner.getProcessor();
+ processor.onScheduled(processContext);
+
processor.onPrimaryNodeStateChange(PrimaryNodeState.PRIMARY_NODE_REVOKED);
+
processor.onPrimaryNodeStateChange(PrimaryNodeState.ELECTED_PRIMARY_NODE);
+
+ final PartitionEvent partitionEvent = createPartitionEvent();
+ partitionEvents.add(partitionEvent);
+
+ testRunner.run(1, true, false);
+ testRunner.assertAllFlowFilesTransferred(GetAzureEventHub.REL_SUCCESS,
1);
+ }
+
private class MockGetAzureEventHub extends GetAzureEventHub {
@Override
diff --git a/nifi-nar-bundles/nifi-azure-bundle/pom.xml
b/nifi-nar-bundles/nifi-azure-bundle/pom.xml
index 9e674eea08..d88bfc589b 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-azure-bundle/pom.xml
@@ -29,7 +29,7 @@
<azure.sdk.bom.version>1.2.9</azure.sdk.bom.version>
<microsoft.azure-storage.version>8.6.6</microsoft.azure-storage.version>
<msal4j.version>1.13.3</msal4j.version>
- <qpid.proton.version>0.34.0</qpid.proton.version>
+ <qpid.proton.version>0.34.1</qpid.proton.version>
</properties>
<modules>