turcsanyip commented on code in PR #6319:
URL: https://github.com/apache/nifi/pull/6319#discussion_r977030508


##########
nifi-nar-bundles/nifi-azure-bundle/pom.xml:
##########
@@ -26,12 +26,9 @@
     <packaging>pom</packaging>
 
     <properties>
-        <azure-storage.version>8.6.6</azure-storage.version>
-        <azure.core.version>1.26.0</azure.core.version>
-        <azure.identity.version>1.4.5</azure.identity.version>
-        <!-- azure-identity depends on msal4j transitively, keep these 
versions consistent -->
-        <msal4j.version>1.11.0</msal4j.version>
-        <azure-cosmos.version>4.26.0</azure-cosmos.version>
+        <azure.sdk.bom.version>1.2.4</azure.sdk.bom.version>

Review Comment:
   Could you please bump the bom version to the latest `1.2.6`?
   I still have troubles with restarting the Consume processor (no messages 
processed after restart) but it seems to work with that version.



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java:
##########
@@ -381,313 +366,285 @@ protected Collection<ValidationResult> 
customValidate(ValidationContext validati
     }
 
     @Override
-    public void onPropertyModified(PropertyDescriptor descriptor, String 
oldValue, String newValue) {
+    public void onPropertyModified(final PropertyDescriptor descriptor, final 
String oldValue, final String newValue) {
         if (RECORD_READER.equals(descriptor)) {
-            isRecordReaderSet = !StringUtils.isEmpty(newValue);
+            isRecordReaderSet = StringUtils.isNotEmpty(newValue);
         } else if (RECORD_WRITER.equals(descriptor)) {
-            isRecordWriterSet = !StringUtils.isEmpty(newValue);
-        }
-    }
-
-    public class EventProcessorFactory implements 
IEventProcessorFactory<EventProcessor> {
-        @Override
-        public EventProcessor createEventProcessor(PartitionContext context) 
throws Exception {
-            final EventProcessor eventProcessor = new EventProcessor();
-            return eventProcessor;
+            isRecordWriterSet = StringUtils.isNotEmpty(newValue);
         }
     }
 
-    public class EventProcessor implements IEventProcessor {
+    @Override
+    public void onTrigger(final ProcessContext context, final 
ProcessSessionFactory sessionFactory) {
+        if (eventProcessorClient == null) {
+            eventProcessorClient = createClient(context);
+            eventProcessorClient.start();
 
-        @Override
-        public void onOpen(PartitionContext context) throws Exception {
-            getLogger().info("Consumer group {} opened partition {} of {}",
-                    new Object[]{context.getConsumerGroupName(), 
context.getPartitionId(), context.getEventHubPath()});
-        }
+            processSessionFactory = sessionFactory;

Review Comment:
   I'd suggest setting `processSessionFactory` before starting the client 
because the client's event handler may try to use it before the variable gets 
initialized (almost zero chance though).



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java:
##########
@@ -307,95 +217,125 @@ public void onScheduled(final ProcessContext context) 
throws ProcessException, U
         final String eventHubName = 
context.getProperty(EVENT_HUB_NAME).getValue();
         final String serviceBusEndpoint = 
context.getProperty(SERVICE_BUS_ENDPOINT).getValue();
         final boolean useManagedIdentity = 
context.getProperty(USE_MANAGED_IDENTITY).asBoolean();
-        final String connectionString;
+        final String fullyQualifiedNamespace = String.format("%s%s", 
namespace, serviceBusEndpoint);
 
-        if(useManagedIdentity){
-            connectionString = 
AzureEventHubUtils.getManagedIdentityConnectionString(namespace, 
serviceBusEndpoint, eventHubName);
+        final EventHubClientBuilder eventHubClientBuilder = new 
EventHubClientBuilder();
+
+        final String consumerGroup = 
context.getProperty(CONSUMER_GROUP).getValue();
+        eventHubClientBuilder.consumerGroup(consumerGroup);
+
+        if (useManagedIdentity) {
+            final ManagedIdentityCredentialBuilder 
managedIdentityCredentialBuilder = new ManagedIdentityCredentialBuilder();
+            final ManagedIdentityCredential managedIdentityCredential = 
managedIdentityCredentialBuilder.build();
+            eventHubClientBuilder.credential(fullyQualifiedNamespace, 
eventHubName, managedIdentityCredential);
         } else {
             final String policyName = 
context.getProperty(ACCESS_POLICY).getValue();
             final String policyKey = 
context.getProperty(POLICY_PRIMARY_KEY).getValue();
-            connectionString = new ConnectionStringBuilder()
-                                    .setEndpoint(new 
URI(String.format(FORMAT_STRING_FOR_CONECTION_BUILDER, namespace, 
serviceBusEndpoint)))
-                                    .setEventHubName(eventHubName)
-                                    .setSasKeyName(policyName)
-                                    .setSasKey(policyKey).toString();
+            final AzureNamedKeyCredential azureNamedKeyCredential = new 
AzureNamedKeyCredential(policyName, policyKey);
+            eventHubClientBuilder.credential(fullyQualifiedNamespace, 
eventHubName, azureNamedKeyCredential);
         }
+        eventHubConsumerClient = eventHubClientBuilder.buildConsumerClient();
 
-        if(context.getProperty(ENQUEUE_TIME).isSet()) {
-            configuredEnqueueTime = 
Instant.parse(context.getProperty(ENQUEUE_TIME).toString());
+        final PropertyValue enqueuedTimeProperty = 
context.getProperty(ENQUEUE_TIME);
+        if (enqueuedTimeProperty.isSet()) {
+            initialEnqueuedTime = 
Instant.parse(enqueuedTimeProperty.getValue());
         } else {
-            configuredEnqueueTime = null;
+            initialEnqueuedTime = Instant.now();
         }
-        if(context.getProperty(RECEIVER_FETCH_SIZE).isSet()) {
+
+        if (context.getProperty(RECEIVER_FETCH_SIZE).isSet()) {
             receiverFetchSize = 
context.getProperty(RECEIVER_FETCH_SIZE).asInteger();
         } else {
             receiverFetchSize = 100;
         }
-        if(context.getProperty(RECEIVER_FETCH_TIMEOUT).isSet()) {
+        if (context.getProperty(RECEIVER_FETCH_TIMEOUT).isSet()) {
             receiverFetchTimeout = 
Duration.ofMillis(context.getProperty(RECEIVER_FETCH_TIMEOUT).asLong());
         } else {
-            receiverFetchTimeout = null;
+            receiverFetchTimeout = Duration.ofMillis(60000);
         }
-
-        executor = Executors.newScheduledThreadPool(4);
-        setupReceiver(connectionString, executor);
     }
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
-        final BlockingQueue<String> partitionIds = this.partitionNames;
-        final String partitionId = partitionIds.poll();
+        final String partitionId = partitionNames.poll();
         if (partitionId == null) {
             getLogger().debug("No partitions available");
             return;
         }
 
         final StopWatch stopWatch = new StopWatch(true);
         try {
+            final Iterable<PartitionEvent> events = receiveEvents(partitionId);
+            for (final PartitionEvent partitionEvent : events) {
+                final Map<String, String> attributes = 
getAttributes(partitionEvent);
 
-            final Iterable<EventData> receivedEvents = receiveEvents(context, 
partitionId);
-            if (receivedEvents == null) {
-                return;
-            }
+                FlowFile flowFile = session.create();
+                flowFile = session.putAllAttributes(flowFile, attributes);
 
-            for (final EventData eventData : receivedEvents) {
-                if (null != eventData) {
+                final EventData eventData = partitionEvent.getData();
+                final byte[] body = eventData.getBody();
+                flowFile = session.write(flowFile, outputStream -> 
outputStream.write(body));
 
-                    final Map<String, String> attributes = new HashMap<>();
-                    FlowFile flowFile = session.create();
-                    final EventData.SystemProperties systemProperties = 
eventData.getSystemProperties();
+                session.transfer(flowFile, REL_SUCCESS);
 
-                    if (null != systemProperties) {
-                        attributes.put("eventhub.enqueued.timestamp", 
String.valueOf(systemProperties.getEnqueuedTime()));
-                        attributes.put("eventhub.offset", 
systemProperties.getOffset());
-                        attributes.put("eventhub.sequence", 
String.valueOf(systemProperties.getSequenceNumber()));
-                    }
+                final String transitUri = getTransitUri(partitionId);
+                session.getProvenanceReporter().receive(flowFile, transitUri, 
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+            }
+        } finally {
+            partitionNames.offer(partitionId);
+        }
+    }
 
-                    final Map<String,String> applicationProperties = 
AzureEventHubUtils.getApplicationProperties(eventData);
-                    attributes.putAll(applicationProperties);
+    /**
+     * Receive Events from specified partition is synchronized to avoid 
concurrent requests for the same partition
+     *
+     * @param partitionId Partition Identifier
+     * @return Iterable of Partition Events or empty when none received
+     */
+    protected synchronized Iterable<PartitionEvent> receiveEvents(final String 
partitionId) {
+        final EventPosition eventPosition;
 
-                    attributes.put("eventhub.name", 
context.getProperty(EVENT_HUB_NAME).getValue());
-                    attributes.put("eventhub.partition", partitionId);
+        if (initialEventPosition == null) {
+            getLogger().debug("Receiving Events for Partition [{}] from 
Initial Enqueued Time [{}]", partitionId, initialEnqueuedTime);
+            initialEventPosition = 
EventPosition.fromEnqueuedTime(initialEnqueuedTime);
+            eventPosition = initialEventPosition;
+        } else {
+            final PartitionProperties partitionProperties = 
eventHubConsumerClient.getPartitionProperties(partitionId);
+            final Instant lastEnqueuedTime = 
partitionProperties.getLastEnqueuedTime();
+            getLogger().debug("Receiving Events for Partition [{}] from Last 
Enqueued Time [{}]", partitionId, lastEnqueuedTime);
+            eventPosition = EventPosition.fromEnqueuedTime(lastEnqueuedTime);

Review Comment:
   `eventPosition` is determined from the current latest event time here but we 
should continue where the previous receive call left off. The current 
implementation skips the events published between 2 onTrigger calls.
   
   I see the root cause is that the previous version of the library provided a 
stateful `PartitionReceiver` object we stored in a map per partition 
`ConcurrentMap<String, PartitionReceiver> partitionToReceiverMap`. This 
receiver object stored the lastly fetched event offset (or similar data) and 
the next `receive()` call started with the subsequent event after that.
   
   Now in the new API version, we have a stateless `receiveFromPartition()` 
method so I think we need to store the last offset positions for the partitions 
ourselves. It can be retrieved from EventData (not sure whether 
`getSequenceNumber()` or getOffset() should be used). I checked the sequence 
number: the `Iterable` returned by `receiveFromPartition()` seems to emit the 
events ordered by sequence number, so the last one should be the latest we need 
to store.
   



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java:
##########
@@ -381,313 +366,285 @@ protected Collection<ValidationResult> 
customValidate(ValidationContext validati
     }
 
     @Override
-    public void onPropertyModified(PropertyDescriptor descriptor, String 
oldValue, String newValue) {
+    public void onPropertyModified(final PropertyDescriptor descriptor, final 
String oldValue, final String newValue) {
         if (RECORD_READER.equals(descriptor)) {
-            isRecordReaderSet = !StringUtils.isEmpty(newValue);
+            isRecordReaderSet = StringUtils.isNotEmpty(newValue);
         } else if (RECORD_WRITER.equals(descriptor)) {
-            isRecordWriterSet = !StringUtils.isEmpty(newValue);
-        }
-    }
-
-    public class EventProcessorFactory implements 
IEventProcessorFactory<EventProcessor> {
-        @Override
-        public EventProcessor createEventProcessor(PartitionContext context) 
throws Exception {
-            final EventProcessor eventProcessor = new EventProcessor();
-            return eventProcessor;
+            isRecordWriterSet = StringUtils.isNotEmpty(newValue);
         }
     }
 
-    public class EventProcessor implements IEventProcessor {
+    @Override
+    public void onTrigger(final ProcessContext context, final 
ProcessSessionFactory sessionFactory) {
+        if (eventProcessorClient == null) {
+            eventProcessorClient = createClient(context);
+            eventProcessorClient.start();
 
-        @Override
-        public void onOpen(PartitionContext context) throws Exception {
-            getLogger().info("Consumer group {} opened partition {} of {}",
-                    new Object[]{context.getConsumerGroupName(), 
context.getPartitionId(), context.getEventHubPath()});
-        }
+            processSessionFactory = sessionFactory;
 
-        @Override
-        public void onClose(PartitionContext context, CloseReason reason) 
throws Exception {
-            getLogger().info("Consumer group {} closed partition {} of {}. 
reason={}",
-                    new Object[]{context.getConsumerGroupName(), 
context.getPartitionId(), context.getEventHubPath(), reason});
+            readerFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+            writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
         }
 
-        @Override
-        public void onEvents(PartitionContext context, Iterable<EventData> 
messages) throws Exception {
-            final ProcessSession session = 
processSessionFactory.createSession();
-
-            try {
-
-                final StopWatch stopWatch = new StopWatch(true);
-
-                if (readerFactory != null && writerFactory != null) {
-                    writeRecords(context, messages, session, stopWatch);
-                } else {
-                    writeFlowFiles(context, messages, session, stopWatch);
-                }
+        // After a EventProcessor is registered successfully, nothing has to 
be done at onTrigger
+        // because new sessions are created when new messages are arrived by 
the EventProcessor.
+        context.yield();
+    }
 
-                // Commit NiFi first.
-                // If creating an Event Hub checkpoint failed, then the same 
message can be retrieved again.
-                session.commitAsync(context::checkpoint);
-            } catch (Exception e) {
-                getLogger().error("Unable to fully process received message 
due to " + e, e);
-                // FlowFiles those are already committed will not get rollback.
-                session.rollback();
-            }
+    @OnStopped
+    public void stopClient() {
+        if (eventProcessorClient != null) {
+            eventProcessorClient.stop();
+            eventProcessorClient = null;
+            processSessionFactory = null;
+            readerFactory = null;
+            writerFactory = null;

Review Comment:
   I recommend calling the `close()` method in a try block and nulling the 
fields even if the `close() `fails. Otherwise `onTrigger()` can never 
reinitalize the client because it is not null.



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java:
##########
@@ -205,98 +193,20 @@ public final List<PropertyDescriptor> 
getSupportedPropertyDescriptors() {
 
     @Override
     protected Collection<ValidationResult> customValidate(ValidationContext 
context) {
-        List<ValidationResult> retVal = 
AzureEventHubUtils.customValidate(ACCESS_POLICY, POLICY_PRIMARY_KEY, context);
-        return retVal;
-    }
-
-    protected void setupReceiver(final String connectionString, final 
ScheduledExecutorService executor) throws ProcessException {
-        try {
-            EventHubClientImpl.USER_AGENT = "ApacheNiFi-azureeventhub/3.1.1";
-            eventHubClient = 
EventHubClient.createFromConnectionStringSync(connectionString, executor);
-        } catch (IOException | EventHubException e) {
-            throw new ProcessException(e);
-        }
-    }
-
-    PartitionReceiver getReceiver(final ProcessContext context, final String 
partitionId) throws IOException, EventHubException, ExecutionException, 
InterruptedException {
-        PartitionReceiver existingReceiver = 
partitionToReceiverMap.get(partitionId);
-        if (existingReceiver != null) {
-            return existingReceiver;
-        }
-
-        // we want to avoid allowing multiple threads to create Receivers 
simultaneously because that could result in
-        // having multiple Receivers for the same partition. So if the map 
does not contain a receiver for this partition,
-        // we will enter a synchronized block and check again (because once we 
enter the synchronized block, we know that no
-        // other thread is creating a client). If within the synchronized 
block, we still do not have an entry in the map,
-        // it is up to use to create the receiver, initialize it, and then put 
it into the map.
-        // We do not use the putIfAbsent method in order to do a CAS operation 
here because we want to also initialize the
-        // receiver if and only if it is not present in the map. As a result, 
we need to initialize the receiver and add it
-        // to the map atomically. Hence, the synchronized block.
-        synchronized (this) {
-            existingReceiver = partitionToReceiverMap.get(partitionId);
-            if (existingReceiver != null) {
-                return existingReceiver;
-            }
-
-            final String consumerGroupName = 
context.getProperty(CONSUMER_GROUP).getValue();
-
-            final PartitionReceiver receiver = eventHubClient.createReceiver(
-                    consumerGroupName,
-                    partitionId,
-                    EventPosition.fromEnqueuedTime(
-                            configuredEnqueueTime == null ? Instant.now() : 
configuredEnqueueTime)).get();
-
-            receiver.setReceiveTimeout(receiverFetchTimeout == null ? 
Duration.ofMillis(60000) : receiverFetchTimeout);
-            partitionToReceiverMap.put(partitionId, receiver);
-            return receiver;
-
-        }
-    }
-
-    /**
-     * This method is here to try and isolate the Azure related code as the 
PartitionReceiver cannot be mocked
-     * with PowerMock due to it being final. Unfortunately it extends a base 
class and does not implement an interface
-     * so even if we create a MockPartitionReciver, it will not work as the 
two classes are orthogonal.
-     *
-     * @param context     - The processcontext for this processor
-     * @param partitionId - The partition ID to retrieve a receiver by.
-     * @return - Returns the events received from the EventBus.
-     * @throws ProcessException -- If any exception is encountered, receiving 
events it is wrapped in a ProcessException
-     *                          and then that exception is thrown.
-     */
-    protected Iterable<EventData> receiveEvents(final ProcessContext context, 
final String partitionId) throws ProcessException {
-        final PartitionReceiver receiver;
-        try {
-            receiver = getReceiver(context, partitionId);
-            return receiver.receive(receiverFetchSize).get();
-        } catch (final EventHubException | IOException | ExecutionException | 
InterruptedException e) {
-            throw new ProcessException(e);
-        }
+        return AzureEventHubUtils.customValidate(ACCESS_POLICY, 
POLICY_PRIMARY_KEY, context);
     }
 
     @OnStopped
-    public void tearDown() throws ProcessException {
-        for (final PartitionReceiver receiver : 
partitionToReceiverMap.values()) {
-            if (null != receiver) {
-                receiver.close();
-            }
-        }
-
-        partitionToReceiverMap.clear();
-        try {
-            if (null != eventHubClient) {
-                eventHubClient.closeSync();
-            }
-            executor.shutdown();
-        } catch (final EventHubException e) {
-            throw new ProcessException(e);
+    public void closeClient() {
+        if (eventHubConsumerClient == null) {
+            getLogger().info("Azure Event Hub Consumer Client not configured");
+        } else {
+            eventHubConsumerClient.close();
         }
     }
 
-    private ScheduledExecutorService executor;
-
     @OnScheduled
-    public void onScheduled(final ProcessContext context) throws 
ProcessException, URISyntaxException {
+    public void onScheduled(final ProcessContext context) {
         final BlockingQueue<String> partitionNames = new 
LinkedBlockingQueue<>();
         for (int i = 0; i < context.getProperty(NUM_PARTITIONS).asInteger(); 
i++) {
             partitionNames.add(String.valueOf(i));

Review Comment:
   The new API provides `EventHubConsumerClient.getPartitionIds()` method and 
it could be used instead of the `Number of Event Hub Partitions` property where 
the user has to specify the right number (which was quite poor user experience, 
in my opinion).
   We can also create a follow-up ticket for it if you prefer that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to