exceptionfactory commented on code in PR #6319:
URL: https://github.com/apache/nifi/pull/6319#discussion_r977179923


##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java:
##########
@@ -307,95 +217,125 @@ public void onScheduled(final ProcessContext context) 
throws ProcessException, U
         final String eventHubName = 
context.getProperty(EVENT_HUB_NAME).getValue();
         final String serviceBusEndpoint = 
context.getProperty(SERVICE_BUS_ENDPOINT).getValue();
         final boolean useManagedIdentity = 
context.getProperty(USE_MANAGED_IDENTITY).asBoolean();
-        final String connectionString;
+        final String fullyQualifiedNamespace = String.format("%s%s", 
namespace, serviceBusEndpoint);
 
-        if(useManagedIdentity){
-            connectionString = 
AzureEventHubUtils.getManagedIdentityConnectionString(namespace, 
serviceBusEndpoint, eventHubName);
+        final EventHubClientBuilder eventHubClientBuilder = new 
EventHubClientBuilder();
+
+        final String consumerGroup = 
context.getProperty(CONSUMER_GROUP).getValue();
+        eventHubClientBuilder.consumerGroup(consumerGroup);
+
+        if (useManagedIdentity) {
+            final ManagedIdentityCredentialBuilder 
managedIdentityCredentialBuilder = new ManagedIdentityCredentialBuilder();
+            final ManagedIdentityCredential managedIdentityCredential = 
managedIdentityCredentialBuilder.build();
+            eventHubClientBuilder.credential(fullyQualifiedNamespace, 
eventHubName, managedIdentityCredential);
         } else {
             final String policyName = 
context.getProperty(ACCESS_POLICY).getValue();
             final String policyKey = 
context.getProperty(POLICY_PRIMARY_KEY).getValue();
-            connectionString = new ConnectionStringBuilder()
-                                    .setEndpoint(new 
URI(String.format(FORMAT_STRING_FOR_CONECTION_BUILDER, namespace, 
serviceBusEndpoint)))
-                                    .setEventHubName(eventHubName)
-                                    .setSasKeyName(policyName)
-                                    .setSasKey(policyKey).toString();
+            final AzureNamedKeyCredential azureNamedKeyCredential = new 
AzureNamedKeyCredential(policyName, policyKey);
+            eventHubClientBuilder.credential(fullyQualifiedNamespace, 
eventHubName, azureNamedKeyCredential);
         }
+        eventHubConsumerClient = eventHubClientBuilder.buildConsumerClient();
 
-        if(context.getProperty(ENQUEUE_TIME).isSet()) {
-            configuredEnqueueTime = 
Instant.parse(context.getProperty(ENQUEUE_TIME).toString());
+        final PropertyValue enqueuedTimeProperty = 
context.getProperty(ENQUEUE_TIME);
+        if (enqueuedTimeProperty.isSet()) {
+            initialEnqueuedTime = 
Instant.parse(enqueuedTimeProperty.getValue());
         } else {
-            configuredEnqueueTime = null;
+            initialEnqueuedTime = Instant.now();
         }
-        if(context.getProperty(RECEIVER_FETCH_SIZE).isSet()) {
+
+        if (context.getProperty(RECEIVER_FETCH_SIZE).isSet()) {
             receiverFetchSize = 
context.getProperty(RECEIVER_FETCH_SIZE).asInteger();
         } else {
             receiverFetchSize = 100;
         }
-        if(context.getProperty(RECEIVER_FETCH_TIMEOUT).isSet()) {
+        if (context.getProperty(RECEIVER_FETCH_TIMEOUT).isSet()) {
             receiverFetchTimeout = 
Duration.ofMillis(context.getProperty(RECEIVER_FETCH_TIMEOUT).asLong());
         } else {
-            receiverFetchTimeout = null;
+            receiverFetchTimeout = Duration.ofMillis(60000);
         }
-
-        executor = Executors.newScheduledThreadPool(4);
-        setupReceiver(connectionString, executor);
     }
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
-        final BlockingQueue<String> partitionIds = this.partitionNames;
-        final String partitionId = partitionIds.poll();
+        final String partitionId = partitionNames.poll();
         if (partitionId == null) {
             getLogger().debug("No partitions available");
             return;
         }
 
         final StopWatch stopWatch = new StopWatch(true);
         try {
+            final Iterable<PartitionEvent> events = receiveEvents(partitionId);
+            for (final PartitionEvent partitionEvent : events) {
+                final Map<String, String> attributes = 
getAttributes(partitionEvent);
 
-            final Iterable<EventData> receivedEvents = receiveEvents(context, 
partitionId);
-            if (receivedEvents == null) {
-                return;
-            }
+                FlowFile flowFile = session.create();
+                flowFile = session.putAllAttributes(flowFile, attributes);
 
-            for (final EventData eventData : receivedEvents) {
-                if (null != eventData) {
+                final EventData eventData = partitionEvent.getData();
+                final byte[] body = eventData.getBody();
+                flowFile = session.write(flowFile, outputStream -> 
outputStream.write(body));
 
-                    final Map<String, String> attributes = new HashMap<>();
-                    FlowFile flowFile = session.create();
-                    final EventData.SystemProperties systemProperties = 
eventData.getSystemProperties();
+                session.transfer(flowFile, REL_SUCCESS);
 
-                    if (null != systemProperties) {
-                        attributes.put("eventhub.enqueued.timestamp", 
String.valueOf(systemProperties.getEnqueuedTime()));
-                        attributes.put("eventhub.offset", 
systemProperties.getOffset());
-                        attributes.put("eventhub.sequence", 
String.valueOf(systemProperties.getSequenceNumber()));
-                    }
+                final String transitUri = getTransitUri(partitionId);
+                session.getProvenanceReporter().receive(flowFile, transitUri, 
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+            }
+        } finally {
+            partitionNames.offer(partitionId);
+        }
+    }
 
-                    final Map<String,String> applicationProperties = 
AzureEventHubUtils.getApplicationProperties(eventData);
-                    attributes.putAll(applicationProperties);
+    /**
+     * Receive Events from specified partition is synchronized to avoid 
concurrent requests for the same partition
+     *
+     * @param partitionId Partition Identifier
+     * @return Iterable of Partition Events or empty when none received
+     */
+    protected synchronized Iterable<PartitionEvent> receiveEvents(final String 
partitionId) {
+        final EventPosition eventPosition;
 
-                    attributes.put("eventhub.name", 
context.getProperty(EVENT_HUB_NAME).getValue());
-                    attributes.put("eventhub.partition", partitionId);
+        if (initialEventPosition == null) {
+            getLogger().debug("Receiving Events for Partition [{}] from 
Initial Enqueued Time [{}]", partitionId, initialEnqueuedTime);
+            initialEventPosition = 
EventPosition.fromEnqueuedTime(initialEnqueuedTime);
+            eventPosition = initialEventPosition;
+        } else {
+            final PartitionProperties partitionProperties = 
eventHubConsumerClient.getPartitionProperties(partitionId);
+            final Instant lastEnqueuedTime = 
partitionProperties.getLastEnqueuedTime();
+            getLogger().debug("Receiving Events for Partition [{}] from Last 
Enqueued Time [{}]", partitionId, lastEnqueuedTime);
+            eventPosition = EventPosition.fromEnqueuedTime(lastEnqueuedTime);

Review Comment:
   Thanks for clarifying the issue and pointing out the differences. I updated 
`GetAzureEventHub` to track last Event Position for each Partition using a 
`ConcurrentHashMap` that starts with the initial enqueued time in `onScheduled` 
and subsequently updates the Event Position based on the last Sequence Number 
observed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to