turcsanyip commented on code in PR #6319:
URL: https://github.com/apache/nifi/pull/6319#discussion_r975117881


##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml:
##########
@@ -62,74 +62,40 @@
         <dependency>
             <groupId>com.azure</groupId>
             <artifactId>azure-core</artifactId>
-            <version>${azure.core.version}</version>
         </dependency>
         <dependency>
             <groupId>com.azure</groupId>
             <artifactId>azure-identity</artifactId>
-            <version>${azure.identity.version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>com.azure</groupId>
-                    <artifactId>azure-core</artifactId>
-                </exclusion>
-            </exclusions>
         </dependency>
         <dependency>
-            <groupId>com.microsoft.azure</groupId>
-            <artifactId>azure-eventhubs</artifactId>
-            <version>${azure-eventhubs.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>com.microsoft.azure</groupId>
-            <artifactId>azure-keyvault</artifactId>
-            <version>${azure-keyvault.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>com.microsoft.azure</groupId>
-            <artifactId>azure-eventhubs-eph</artifactId>
-            <version>${azure-eventhubs-eph.version}</version>
+            <groupId>com.azure</groupId>
+            <artifactId>azure-messaging-eventhubs</artifactId>
         </dependency>
         <dependency>
-            <groupId>com.microsoft.azure</groupId>
-            <artifactId>azure-storage</artifactId>
+            <groupId>com.azure</groupId>
+            
<artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
         </dependency>
         <dependency>
             <groupId>com.azure</groupId>
             <artifactId>azure-cosmos</artifactId>
-            <version>${azure-cosmos.version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>com.azure</groupId>
-                    <artifactId>azure-core</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.google.guava</groupId>
-                    <artifactId>guava</artifactId>
-                </exclusion>
-            </exclusions>
         </dependency>
         <dependency>
             <groupId>com.azure</groupId>
             <artifactId>azure-storage-file-datalake</artifactId>
-            <version>${azure-storage-file-datalake.version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>com.azure</groupId>
-                    <artifactId>azure-core</artifactId>
-                </exclusion>
-            </exclusions>
         </dependency>
         <dependency>
             <groupId>com.azure</groupId>
             <artifactId>azure-storage-blob</artifactId>
-            <version>${azure-storage-blob.version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>com.azure</groupId>
-                    <artifactId>azure-core</artifactId>
-                </exclusion>
-            </exclusions>
+        </dependency>
+        <!-- Legacy Microsoft Azure Libraries -->
+        <dependency>
+            <groupId>com.microsoft.azure</groupId>
+            <artifactId>azure-keyvault</artifactId>
+            <version>${azure-keyvault.version}</version>
+        </dependency>

Review Comment:
   `azure-keyvault` still brings `jsr305` in but you mentioned that it is no 
longer a transitive dependency of current libraries.



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java:
##########
@@ -166,257 +152,135 @@ public final List<PropertyDescriptor> 
getSupportedPropertyDescriptors() {
         return propertyDescriptors;
     }
 
-    private ScheduledExecutorService executor;
-
     @OnScheduled
-    public final void setupClient(final ProcessContext context) throws 
ProcessException{
+    public final void createClient(final ProcessContext context) {
+        eventHubProducerClient = createEventHubProducerClient(context);
     }
 
     @OnStopped
-    public void tearDown() {
-        EventHubClient sender;
-        while ((sender = senderQueue.poll()) != null) {
-            sender.close();
+    public void closeClient() {
+        if (eventHubProducerClient == null) {
+            getLogger().info("Azure Event Hub Producer Client not configured");
+        } else {
+            eventHubProducerClient.close();
         }
     }
 
     @Override
     protected Collection<ValidationResult> customValidate(ValidationContext 
context) {
-        List<ValidationResult> retVal = 
AzureEventHubUtils.customValidate(ACCESS_POLICY, POLICY_PRIMARY_KEY, context);
-        return retVal;
+        return AzureEventHubUtils.customValidate(ACCESS_POLICY, 
POLICY_PRIMARY_KEY, context);
     }
 
-
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
-        try {
-            populateSenderQueue(context);
-        } catch (ProcessException e) {
-            context.yield();
-            throw e;
-        }
-
         final StopWatch stopWatch = new StopWatch(true);
 
         final String partitioningKeyAttributeName = 
context.getProperty(PARTITIONING_KEY_ATTRIBUTE_NAME).getValue();
 
-        // Get N flow files
-        final int maxBatchSize = 
NumberUtils.toInt(context.getProperty(MAX_BATCH_SIZE).getValue(), 100);
-        final List<FlowFile> flowFileList = session.get(maxBatchSize);
+        final int maxBatchSize = 
context.getProperty(MAX_BATCH_SIZE).asInteger();
+        final List<FlowFile> flowFileBatch = session.get(maxBatchSize);
 
-        // Convert and send each flow file
-        final 
BlockingQueue<CompletableFuture<FlowFileResultCarrier<Relationship>>> 
futureQueue = new LinkedBlockingQueue<>();
-        for (FlowFile flowFile : flowFileList) {
-            if (flowFile == null) {
-                continue;
-            }
+        final List<FlowFileResultCarrier<Relationship>> flowFileResults = new 
ArrayList<>();
+        for (final FlowFile flowFile : flowFileBatch) {
+            final FlowFileResultCarrier<Relationship> flowFileResult = 
handleFlowFile(flowFile, partitioningKeyAttributeName, session);
+            flowFileResults.add(flowFileResult);
+        }
 
-            futureQueue.offer(handleFlowFile(flowFile, 
partitioningKeyAttributeName, session));
+        processFlowFileResults(context, session, stopWatch, flowFileResults);
+    }
+
+    protected EventHubProducerClient createEventHubProducerClient(final 
ProcessContext context) throws ProcessException {
+        final boolean useManagedIdentity = 
context.getProperty(USE_MANAGED_IDENTITY).asBoolean();
+        final String policyName, policyKey;
+        if (useManagedIdentity) {
+            policyName = AzureEventHubUtils.MANAGED_IDENTITY_POLICY;
+            policyKey = null;
+        } else {
+            policyName = context.getProperty(ACCESS_POLICY).getValue();
+            policyKey = context.getProperty(POLICY_PRIMARY_KEY).getValue();
         }
+        final String namespace = context.getProperty(NAMESPACE).getValue();
+        final String serviceBusEndpoint = 
context.getProperty(SERVICE_BUS_ENDPOINT).getValue();
+        final String eventHubName = 
context.getProperty(EVENT_HUB_NAME).getValue();
 
-        waitForAllFutures(context, session, stopWatch, futureQueue);
+        try {
+            final EventHubClientBuilder eventHubClientBuilder = new 
EventHubClientBuilder();
+
+            final String fullyQualifiedNamespace = String.format("%s%s", 
namespace, serviceBusEndpoint);
+            if (AzureEventHubUtils.MANAGED_IDENTITY_POLICY.equals(policyName)) 
{
+                final ManagedIdentityCredentialBuilder 
managedIdentityCredentialBuilder = new ManagedIdentityCredentialBuilder();
+                final ManagedIdentityCredential managedIdentityCredential = 
managedIdentityCredentialBuilder.build();
+                eventHubClientBuilder.credential(fullyQualifiedNamespace, 
eventHubName, managedIdentityCredential);
+            } else {
+                final AzureNamedKeyCredential azureNamedKeyCredential = new 
AzureNamedKeyCredential(policyName, policyKey);
+                eventHubClientBuilder.credential(fullyQualifiedNamespace, 
eventHubName, azureNamedKeyCredential);
+            }

Review Comment:
   I think the code for the Managed Identity case could be simplified:
   - no `MANAGED_IDENTITY_POLICY` constant needed at all
   - `if (useManagedIdentity)` block (line 195-201) can be deleted
   - and modify this `if`:
   ```suggestion
               if (useManagedIdentity) {
                   final ManagedIdentityCredentialBuilder 
managedIdentityCredentialBuilder = new ManagedIdentityCredentialBuilder();
                   final ManagedIdentityCredential managedIdentityCredential = 
managedIdentityCredentialBuilder.build();
                   eventHubClientBuilder.credential(fullyQualifiedNamespace, 
eventHubName, managedIdentityCredential);
               } else {
                   final String policyName = 
context.getProperty(ACCESS_POLICY).getValue();
                   final String policyKey = 
context.getProperty(POLICY_PRIMARY_KEY).getValue();
                   final AzureNamedKeyCredential azureNamedKeyCredential = new 
AzureNamedKeyCredential(policyName, policyKey);
                   eventHubClientBuilder.credential(fullyQualifiedNamespace, 
eventHubName, azureNamedKeyCredential);
               }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to