turcsanyip commented on code in PR #6319:
URL: https://github.com/apache/nifi/pull/6319#discussion_r975117881
##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml:
##########
@@ -62,74 +62,40 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core</artifactId>
- <version>${azure.core.version}</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-identity</artifactId>
- <version>${azure.identity.version}</version>
- <exclusions>
- <exclusion>
- <groupId>com.azure</groupId>
- <artifactId>azure-core</artifactId>
- </exclusion>
- </exclusions>
</dependency>
<dependency>
- <groupId>com.microsoft.azure</groupId>
- <artifactId>azure-eventhubs</artifactId>
- <version>${azure-eventhubs.version}</version>
- </dependency>
- <dependency>
- <groupId>com.microsoft.azure</groupId>
- <artifactId>azure-keyvault</artifactId>
- <version>${azure-keyvault.version}</version>
- </dependency>
- <dependency>
- <groupId>com.microsoft.azure</groupId>
- <artifactId>azure-eventhubs-eph</artifactId>
- <version>${azure-eventhubs-eph.version}</version>
+ <groupId>com.azure</groupId>
+ <artifactId>azure-messaging-eventhubs</artifactId>
</dependency>
<dependency>
- <groupId>com.microsoft.azure</groupId>
- <artifactId>azure-storage</artifactId>
+ <groupId>com.azure</groupId>
+
<artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-cosmos</artifactId>
- <version>${azure-cosmos.version}</version>
- <exclusions>
- <exclusion>
- <groupId>com.azure</groupId>
- <artifactId>azure-core</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- </exclusion>
- </exclusions>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-storage-file-datalake</artifactId>
- <version>${azure-storage-file-datalake.version}</version>
- <exclusions>
- <exclusion>
- <groupId>com.azure</groupId>
- <artifactId>azure-core</artifactId>
- </exclusion>
- </exclusions>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-storage-blob</artifactId>
- <version>${azure-storage-blob.version}</version>
- <exclusions>
- <exclusion>
- <groupId>com.azure</groupId>
- <artifactId>azure-core</artifactId>
- </exclusion>
- </exclusions>
+ </dependency>
+ <!-- Legacy Microsoft Azure Libraries -->
+ <dependency>
+ <groupId>com.microsoft.azure</groupId>
+ <artifactId>azure-keyvault</artifactId>
+ <version>${azure-keyvault.version}</version>
+ </dependency>
Review Comment:
`azure-keyvault` still brings `jsr305` in but you mentioned that it is no
longer a transitive dependency of current libraries.
##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java:
##########
@@ -166,257 +152,135 @@ public final List<PropertyDescriptor>
getSupportedPropertyDescriptors() {
return propertyDescriptors;
}
- private ScheduledExecutorService executor;
-
@OnScheduled
- public final void setupClient(final ProcessContext context) throws
ProcessException{
+ public final void createClient(final ProcessContext context) {
+ eventHubProducerClient = createEventHubProducerClient(context);
}
@OnStopped
- public void tearDown() {
- EventHubClient sender;
- while ((sender = senderQueue.poll()) != null) {
- sender.close();
+ public void closeClient() {
+ if (eventHubProducerClient == null) {
+ getLogger().info("Azure Event Hub Producer Client not configured");
+ } else {
+ eventHubProducerClient.close();
}
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext
context) {
- List<ValidationResult> retVal =
AzureEventHubUtils.customValidate(ACCESS_POLICY, POLICY_PRIMARY_KEY, context);
- return retVal;
+ return AzureEventHubUtils.customValidate(ACCESS_POLICY,
POLICY_PRIMARY_KEY, context);
}
-
@Override
public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
- try {
- populateSenderQueue(context);
- } catch (ProcessException e) {
- context.yield();
- throw e;
- }
-
final StopWatch stopWatch = new StopWatch(true);
final String partitioningKeyAttributeName =
context.getProperty(PARTITIONING_KEY_ATTRIBUTE_NAME).getValue();
- // Get N flow files
- final int maxBatchSize =
NumberUtils.toInt(context.getProperty(MAX_BATCH_SIZE).getValue(), 100);
- final List<FlowFile> flowFileList = session.get(maxBatchSize);
+ final int maxBatchSize =
context.getProperty(MAX_BATCH_SIZE).asInteger();
+ final List<FlowFile> flowFileBatch = session.get(maxBatchSize);
- // Convert and send each flow file
- final
BlockingQueue<CompletableFuture<FlowFileResultCarrier<Relationship>>>
futureQueue = new LinkedBlockingQueue<>();
- for (FlowFile flowFile : flowFileList) {
- if (flowFile == null) {
- continue;
- }
+ final List<FlowFileResultCarrier<Relationship>> flowFileResults = new
ArrayList<>();
+ for (final FlowFile flowFile : flowFileBatch) {
+ final FlowFileResultCarrier<Relationship> flowFileResult =
handleFlowFile(flowFile, partitioningKeyAttributeName, session);
+ flowFileResults.add(flowFileResult);
+ }
- futureQueue.offer(handleFlowFile(flowFile,
partitioningKeyAttributeName, session));
+ processFlowFileResults(context, session, stopWatch, flowFileResults);
+ }
+
+ protected EventHubProducerClient createEventHubProducerClient(final
ProcessContext context) throws ProcessException {
+ final boolean useManagedIdentity =
context.getProperty(USE_MANAGED_IDENTITY).asBoolean();
+ final String policyName, policyKey;
+ if (useManagedIdentity) {
+ policyName = AzureEventHubUtils.MANAGED_IDENTITY_POLICY;
+ policyKey = null;
+ } else {
+ policyName = context.getProperty(ACCESS_POLICY).getValue();
+ policyKey = context.getProperty(POLICY_PRIMARY_KEY).getValue();
}
+ final String namespace = context.getProperty(NAMESPACE).getValue();
+ final String serviceBusEndpoint =
context.getProperty(SERVICE_BUS_ENDPOINT).getValue();
+ final String eventHubName =
context.getProperty(EVENT_HUB_NAME).getValue();
- waitForAllFutures(context, session, stopWatch, futureQueue);
+ try {
+ final EventHubClientBuilder eventHubClientBuilder = new
EventHubClientBuilder();
+
+ final String fullyQualifiedNamespace = String.format("%s%s",
namespace, serviceBusEndpoint);
+ if (AzureEventHubUtils.MANAGED_IDENTITY_POLICY.equals(policyName))
{
+ final ManagedIdentityCredentialBuilder
managedIdentityCredentialBuilder = new ManagedIdentityCredentialBuilder();
+ final ManagedIdentityCredential managedIdentityCredential =
managedIdentityCredentialBuilder.build();
+ eventHubClientBuilder.credential(fullyQualifiedNamespace,
eventHubName, managedIdentityCredential);
+ } else {
+ final AzureNamedKeyCredential azureNamedKeyCredential = new
AzureNamedKeyCredential(policyName, policyKey);
+ eventHubClientBuilder.credential(fullyQualifiedNamespace,
eventHubName, azureNamedKeyCredential);
+ }
Review Comment:
I think the code for the Managed Identity case could be simplified:
- no `MANAGED_IDENTITY_POLICY` constant needed at all
- `if (useManagedIdentity)` block (line 195-201) can be deleted
- and modify this `if`:
```suggestion
if (useManagedIdentity) {
final ManagedIdentityCredentialBuilder
managedIdentityCredentialBuilder = new ManagedIdentityCredentialBuilder();
final ManagedIdentityCredential managedIdentityCredential =
managedIdentityCredentialBuilder.build();
eventHubClientBuilder.credential(fullyQualifiedNamespace,
eventHubName, managedIdentityCredential);
} else {
final String policyName =
context.getProperty(ACCESS_POLICY).getValue();
final String policyKey =
context.getProperty(POLICY_PRIMARY_KEY).getValue();
final AzureNamedKeyCredential azureNamedKeyCredential = new
AzureNamedKeyCredential(policyName, policyKey);
eventHubClientBuilder.credential(fullyQualifiedNamespace,
eventHubName, azureNamedKeyCredential);
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]