exceptionfactory commented on code in PR #6319:
URL: https://github.com/apache/nifi/pull/6319#discussion_r977179923
##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java:
##########
@@ -307,95 +217,125 @@ public void onScheduled(final ProcessContext context)
throws ProcessException, U
final String eventHubName =
context.getProperty(EVENT_HUB_NAME).getValue();
final String serviceBusEndpoint =
context.getProperty(SERVICE_BUS_ENDPOINT).getValue();
final boolean useManagedIdentity =
context.getProperty(USE_MANAGED_IDENTITY).asBoolean();
- final String connectionString;
+ final String fullyQualifiedNamespace = String.format("%s%s",
namespace, serviceBusEndpoint);
- if(useManagedIdentity){
- connectionString =
AzureEventHubUtils.getManagedIdentityConnectionString(namespace,
serviceBusEndpoint, eventHubName);
+ final EventHubClientBuilder eventHubClientBuilder = new
EventHubClientBuilder();
+
+ final String consumerGroup =
context.getProperty(CONSUMER_GROUP).getValue();
+ eventHubClientBuilder.consumerGroup(consumerGroup);
+
+ if (useManagedIdentity) {
+ final ManagedIdentityCredentialBuilder
managedIdentityCredentialBuilder = new ManagedIdentityCredentialBuilder();
+ final ManagedIdentityCredential managedIdentityCredential =
managedIdentityCredentialBuilder.build();
+ eventHubClientBuilder.credential(fullyQualifiedNamespace,
eventHubName, managedIdentityCredential);
} else {
final String policyName =
context.getProperty(ACCESS_POLICY).getValue();
final String policyKey =
context.getProperty(POLICY_PRIMARY_KEY).getValue();
- connectionString = new ConnectionStringBuilder()
- .setEndpoint(new
URI(String.format(FORMAT_STRING_FOR_CONECTION_BUILDER, namespace,
serviceBusEndpoint)))
- .setEventHubName(eventHubName)
- .setSasKeyName(policyName)
- .setSasKey(policyKey).toString();
+ final AzureNamedKeyCredential azureNamedKeyCredential = new
AzureNamedKeyCredential(policyName, policyKey);
+ eventHubClientBuilder.credential(fullyQualifiedNamespace,
eventHubName, azureNamedKeyCredential);
}
+ eventHubConsumerClient = eventHubClientBuilder.buildConsumerClient();
- if(context.getProperty(ENQUEUE_TIME).isSet()) {
- configuredEnqueueTime =
Instant.parse(context.getProperty(ENQUEUE_TIME).toString());
+ final PropertyValue enqueuedTimeProperty =
context.getProperty(ENQUEUE_TIME);
+ if (enqueuedTimeProperty.isSet()) {
+ initialEnqueuedTime =
Instant.parse(enqueuedTimeProperty.getValue());
} else {
- configuredEnqueueTime = null;
+ initialEnqueuedTime = Instant.now();
}
- if(context.getProperty(RECEIVER_FETCH_SIZE).isSet()) {
+
+ if (context.getProperty(RECEIVER_FETCH_SIZE).isSet()) {
receiverFetchSize =
context.getProperty(RECEIVER_FETCH_SIZE).asInteger();
} else {
receiverFetchSize = 100;
}
- if(context.getProperty(RECEIVER_FETCH_TIMEOUT).isSet()) {
+ if (context.getProperty(RECEIVER_FETCH_TIMEOUT).isSet()) {
receiverFetchTimeout =
Duration.ofMillis(context.getProperty(RECEIVER_FETCH_TIMEOUT).asLong());
} else {
- receiverFetchTimeout = null;
+ receiverFetchTimeout = Duration.ofMillis(60000);
}
-
- executor = Executors.newScheduledThreadPool(4);
- setupReceiver(connectionString, executor);
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
- final BlockingQueue<String> partitionIds = this.partitionNames;
- final String partitionId = partitionIds.poll();
+ final String partitionId = partitionNames.poll();
if (partitionId == null) {
getLogger().debug("No partitions available");
return;
}
final StopWatch stopWatch = new StopWatch(true);
try {
+ final Iterable<PartitionEvent> events = receiveEvents(partitionId);
+ for (final PartitionEvent partitionEvent : events) {
+ final Map<String, String> attributes =
getAttributes(partitionEvent);
- final Iterable<EventData> receivedEvents = receiveEvents(context,
partitionId);
- if (receivedEvents == null) {
- return;
- }
+ FlowFile flowFile = session.create();
+ flowFile = session.putAllAttributes(flowFile, attributes);
- for (final EventData eventData : receivedEvents) {
- if (null != eventData) {
+ final EventData eventData = partitionEvent.getData();
+ final byte[] body = eventData.getBody();
+ flowFile = session.write(flowFile, outputStream ->
outputStream.write(body));
- final Map<String, String> attributes = new HashMap<>();
- FlowFile flowFile = session.create();
- final EventData.SystemProperties systemProperties =
eventData.getSystemProperties();
+ session.transfer(flowFile, REL_SUCCESS);
- if (null != systemProperties) {
- attributes.put("eventhub.enqueued.timestamp",
String.valueOf(systemProperties.getEnqueuedTime()));
- attributes.put("eventhub.offset",
systemProperties.getOffset());
- attributes.put("eventhub.sequence",
String.valueOf(systemProperties.getSequenceNumber()));
- }
+ final String transitUri = getTransitUri(partitionId);
+ session.getProvenanceReporter().receive(flowFile, transitUri,
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+ }
+ } finally {
+ partitionNames.offer(partitionId);
+ }
+ }
- final Map<String,String> applicationProperties =
AzureEventHubUtils.getApplicationProperties(eventData);
- attributes.putAll(applicationProperties);
+ /**
+ * Receive Events from specified partition is synchronized to avoid
concurrent requests for the same partition
+ *
+ * @param partitionId Partition Identifier
+ * @return Iterable of Partition Events or empty when none received
+ */
+ protected synchronized Iterable<PartitionEvent> receiveEvents(final String
partitionId) {
+ final EventPosition eventPosition;
- attributes.put("eventhub.name",
context.getProperty(EVENT_HUB_NAME).getValue());
- attributes.put("eventhub.partition", partitionId);
+ if (initialEventPosition == null) {
+ getLogger().debug("Receiving Events for Partition [{}] from
Initial Enqueued Time [{}]", partitionId, initialEnqueuedTime);
+ initialEventPosition =
EventPosition.fromEnqueuedTime(initialEnqueuedTime);
+ eventPosition = initialEventPosition;
+ } else {
+ final PartitionProperties partitionProperties =
eventHubConsumerClient.getPartitionProperties(partitionId);
+ final Instant lastEnqueuedTime =
partitionProperties.getLastEnqueuedTime();
+ getLogger().debug("Receiving Events for Partition [{}] from Last
Enqueued Time [{}]", partitionId, lastEnqueuedTime);
+ eventPosition = EventPosition.fromEnqueuedTime(lastEnqueuedTime);
Review Comment:
Thanks for clarifying the issue and pointing out the differences. I updated
`GetAzureEventHub` to track last Event Position for each Partition using a
`ConcurrentHashMap` that starts with the initial enqueued time in `onScheduled`
and subsequently updates the Event Position based on the last Sequence Number
observed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]