turcsanyip commented on code in PR #6319:
URL: https://github.com/apache/nifi/pull/6319#discussion_r977030508
##########
nifi-nar-bundles/nifi-azure-bundle/pom.xml:
##########
@@ -26,12 +26,9 @@
<packaging>pom</packaging>
<properties>
- <azure-storage.version>8.6.6</azure-storage.version>
- <azure.core.version>1.26.0</azure.core.version>
- <azure.identity.version>1.4.5</azure.identity.version>
- <!-- azure-identity depends on msal4j transitively, keep these
versions consistent -->
- <msal4j.version>1.11.0</msal4j.version>
- <azure-cosmos.version>4.26.0</azure-cosmos.version>
+ <azure.sdk.bom.version>1.2.4</azure.sdk.bom.version>
Review Comment:
Could you please bump the bom version to the latest `1.2.6`?
I still have troubles with restarting the Consume processor (no messages
processed after restart) but it seems to work with that version.
##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java:
##########
@@ -381,313 +366,285 @@ protected Collection<ValidationResult>
customValidate(ValidationContext validati
}
@Override
- public void onPropertyModified(PropertyDescriptor descriptor, String
oldValue, String newValue) {
+ public void onPropertyModified(final PropertyDescriptor descriptor, final
String oldValue, final String newValue) {
if (RECORD_READER.equals(descriptor)) {
- isRecordReaderSet = !StringUtils.isEmpty(newValue);
+ isRecordReaderSet = StringUtils.isNotEmpty(newValue);
} else if (RECORD_WRITER.equals(descriptor)) {
- isRecordWriterSet = !StringUtils.isEmpty(newValue);
- }
- }
-
- public class EventProcessorFactory implements
IEventProcessorFactory<EventProcessor> {
- @Override
- public EventProcessor createEventProcessor(PartitionContext context)
throws Exception {
- final EventProcessor eventProcessor = new EventProcessor();
- return eventProcessor;
+ isRecordWriterSet = StringUtils.isNotEmpty(newValue);
}
}
- public class EventProcessor implements IEventProcessor {
+ @Override
+ public void onTrigger(final ProcessContext context, final
ProcessSessionFactory sessionFactory) {
+ if (eventProcessorClient == null) {
+ eventProcessorClient = createClient(context);
+ eventProcessorClient.start();
- @Override
- public void onOpen(PartitionContext context) throws Exception {
- getLogger().info("Consumer group {} opened partition {} of {}",
- new Object[]{context.getConsumerGroupName(),
context.getPartitionId(), context.getEventHubPath()});
- }
+ processSessionFactory = sessionFactory;
Review Comment:
I'd suggest setting `processSessionFactory` before starting the client
because the client's event handler may try to use it before the variable gets
initialized (almost zero chance though).
##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java:
##########
@@ -307,95 +217,125 @@ public void onScheduled(final ProcessContext context)
throws ProcessException, U
final String eventHubName =
context.getProperty(EVENT_HUB_NAME).getValue();
final String serviceBusEndpoint =
context.getProperty(SERVICE_BUS_ENDPOINT).getValue();
final boolean useManagedIdentity =
context.getProperty(USE_MANAGED_IDENTITY).asBoolean();
- final String connectionString;
+ final String fullyQualifiedNamespace = String.format("%s%s",
namespace, serviceBusEndpoint);
- if(useManagedIdentity){
- connectionString =
AzureEventHubUtils.getManagedIdentityConnectionString(namespace,
serviceBusEndpoint, eventHubName);
+ final EventHubClientBuilder eventHubClientBuilder = new
EventHubClientBuilder();
+
+ final String consumerGroup =
context.getProperty(CONSUMER_GROUP).getValue();
+ eventHubClientBuilder.consumerGroup(consumerGroup);
+
+ if (useManagedIdentity) {
+ final ManagedIdentityCredentialBuilder
managedIdentityCredentialBuilder = new ManagedIdentityCredentialBuilder();
+ final ManagedIdentityCredential managedIdentityCredential =
managedIdentityCredentialBuilder.build();
+ eventHubClientBuilder.credential(fullyQualifiedNamespace,
eventHubName, managedIdentityCredential);
} else {
final String policyName =
context.getProperty(ACCESS_POLICY).getValue();
final String policyKey =
context.getProperty(POLICY_PRIMARY_KEY).getValue();
- connectionString = new ConnectionStringBuilder()
- .setEndpoint(new
URI(String.format(FORMAT_STRING_FOR_CONECTION_BUILDER, namespace,
serviceBusEndpoint)))
- .setEventHubName(eventHubName)
- .setSasKeyName(policyName)
- .setSasKey(policyKey).toString();
+ final AzureNamedKeyCredential azureNamedKeyCredential = new
AzureNamedKeyCredential(policyName, policyKey);
+ eventHubClientBuilder.credential(fullyQualifiedNamespace,
eventHubName, azureNamedKeyCredential);
}
+ eventHubConsumerClient = eventHubClientBuilder.buildConsumerClient();
- if(context.getProperty(ENQUEUE_TIME).isSet()) {
- configuredEnqueueTime =
Instant.parse(context.getProperty(ENQUEUE_TIME).toString());
+ final PropertyValue enqueuedTimeProperty =
context.getProperty(ENQUEUE_TIME);
+ if (enqueuedTimeProperty.isSet()) {
+ initialEnqueuedTime =
Instant.parse(enqueuedTimeProperty.getValue());
} else {
- configuredEnqueueTime = null;
+ initialEnqueuedTime = Instant.now();
}
- if(context.getProperty(RECEIVER_FETCH_SIZE).isSet()) {
+
+ if (context.getProperty(RECEIVER_FETCH_SIZE).isSet()) {
receiverFetchSize =
context.getProperty(RECEIVER_FETCH_SIZE).asInteger();
} else {
receiverFetchSize = 100;
}
- if(context.getProperty(RECEIVER_FETCH_TIMEOUT).isSet()) {
+ if (context.getProperty(RECEIVER_FETCH_TIMEOUT).isSet()) {
receiverFetchTimeout =
Duration.ofMillis(context.getProperty(RECEIVER_FETCH_TIMEOUT).asLong());
} else {
- receiverFetchTimeout = null;
+ receiverFetchTimeout = Duration.ofMillis(60000);
}
-
- executor = Executors.newScheduledThreadPool(4);
- setupReceiver(connectionString, executor);
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
- final BlockingQueue<String> partitionIds = this.partitionNames;
- final String partitionId = partitionIds.poll();
+ final String partitionId = partitionNames.poll();
if (partitionId == null) {
getLogger().debug("No partitions available");
return;
}
final StopWatch stopWatch = new StopWatch(true);
try {
+ final Iterable<PartitionEvent> events = receiveEvents(partitionId);
+ for (final PartitionEvent partitionEvent : events) {
+ final Map<String, String> attributes =
getAttributes(partitionEvent);
- final Iterable<EventData> receivedEvents = receiveEvents(context,
partitionId);
- if (receivedEvents == null) {
- return;
- }
+ FlowFile flowFile = session.create();
+ flowFile = session.putAllAttributes(flowFile, attributes);
- for (final EventData eventData : receivedEvents) {
- if (null != eventData) {
+ final EventData eventData = partitionEvent.getData();
+ final byte[] body = eventData.getBody();
+ flowFile = session.write(flowFile, outputStream ->
outputStream.write(body));
- final Map<String, String> attributes = new HashMap<>();
- FlowFile flowFile = session.create();
- final EventData.SystemProperties systemProperties =
eventData.getSystemProperties();
+ session.transfer(flowFile, REL_SUCCESS);
- if (null != systemProperties) {
- attributes.put("eventhub.enqueued.timestamp",
String.valueOf(systemProperties.getEnqueuedTime()));
- attributes.put("eventhub.offset",
systemProperties.getOffset());
- attributes.put("eventhub.sequence",
String.valueOf(systemProperties.getSequenceNumber()));
- }
+ final String transitUri = getTransitUri(partitionId);
+ session.getProvenanceReporter().receive(flowFile, transitUri,
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+ }
+ } finally {
+ partitionNames.offer(partitionId);
+ }
+ }
- final Map<String,String> applicationProperties =
AzureEventHubUtils.getApplicationProperties(eventData);
- attributes.putAll(applicationProperties);
+ /**
+ * Receive Events from specified partition is synchronized to avoid
concurrent requests for the same partition
+ *
+ * @param partitionId Partition Identifier
+ * @return Iterable of Partition Events or empty when none received
+ */
+ protected synchronized Iterable<PartitionEvent> receiveEvents(final String
partitionId) {
+ final EventPosition eventPosition;
- attributes.put("eventhub.name",
context.getProperty(EVENT_HUB_NAME).getValue());
- attributes.put("eventhub.partition", partitionId);
+ if (initialEventPosition == null) {
+ getLogger().debug("Receiving Events for Partition [{}] from
Initial Enqueued Time [{}]", partitionId, initialEnqueuedTime);
+ initialEventPosition =
EventPosition.fromEnqueuedTime(initialEnqueuedTime);
+ eventPosition = initialEventPosition;
+ } else {
+ final PartitionProperties partitionProperties =
eventHubConsumerClient.getPartitionProperties(partitionId);
+ final Instant lastEnqueuedTime =
partitionProperties.getLastEnqueuedTime();
+ getLogger().debug("Receiving Events for Partition [{}] from Last
Enqueued Time [{}]", partitionId, lastEnqueuedTime);
+ eventPosition = EventPosition.fromEnqueuedTime(lastEnqueuedTime);
Review Comment:
`eventPosition` is determined from the current latest event time here but we
should continue where the previous receive call left off. The current
implementation skips the events published between 2 onTrigger calls.
I see the root cause is that the previous version of the library provided a
stateful `PartitionReceiver` object we stored in a map per partition
`ConcurrentMap<String, PartitionReceiver> partitionToReceiverMap`. This
receiver object stored the lastly fetched event offset (or similar data) and
the next `receive()` call started with the subsequent event after that.
Now in the new API version, we have a stateless `receiveFromPartition()`
method so I think we need to store the last offset positions for the partitions
ourselves. It can be retrieved from EventData (not sure whether
`getSequenceNumber()` or getOffset() should be used). I checked the sequence
number: the `Iterable` returned by `receiveFromPartition()` seems to emit the
events ordered by sequence number, so the last one should be the latest we need
to store.
##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java:
##########
@@ -381,313 +366,285 @@ protected Collection<ValidationResult>
customValidate(ValidationContext validati
}
@Override
- public void onPropertyModified(PropertyDescriptor descriptor, String
oldValue, String newValue) {
+ public void onPropertyModified(final PropertyDescriptor descriptor, final
String oldValue, final String newValue) {
if (RECORD_READER.equals(descriptor)) {
- isRecordReaderSet = !StringUtils.isEmpty(newValue);
+ isRecordReaderSet = StringUtils.isNotEmpty(newValue);
} else if (RECORD_WRITER.equals(descriptor)) {
- isRecordWriterSet = !StringUtils.isEmpty(newValue);
- }
- }
-
- public class EventProcessorFactory implements
IEventProcessorFactory<EventProcessor> {
- @Override
- public EventProcessor createEventProcessor(PartitionContext context)
throws Exception {
- final EventProcessor eventProcessor = new EventProcessor();
- return eventProcessor;
+ isRecordWriterSet = StringUtils.isNotEmpty(newValue);
}
}
- public class EventProcessor implements IEventProcessor {
+ @Override
+ public void onTrigger(final ProcessContext context, final
ProcessSessionFactory sessionFactory) {
+ if (eventProcessorClient == null) {
+ eventProcessorClient = createClient(context);
+ eventProcessorClient.start();
- @Override
- public void onOpen(PartitionContext context) throws Exception {
- getLogger().info("Consumer group {} opened partition {} of {}",
- new Object[]{context.getConsumerGroupName(),
context.getPartitionId(), context.getEventHubPath()});
- }
+ processSessionFactory = sessionFactory;
- @Override
- public void onClose(PartitionContext context, CloseReason reason)
throws Exception {
- getLogger().info("Consumer group {} closed partition {} of {}.
reason={}",
- new Object[]{context.getConsumerGroupName(),
context.getPartitionId(), context.getEventHubPath(), reason});
+ readerFactory =
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+ writerFactory =
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
}
- @Override
- public void onEvents(PartitionContext context, Iterable<EventData>
messages) throws Exception {
- final ProcessSession session =
processSessionFactory.createSession();
-
- try {
-
- final StopWatch stopWatch = new StopWatch(true);
-
- if (readerFactory != null && writerFactory != null) {
- writeRecords(context, messages, session, stopWatch);
- } else {
- writeFlowFiles(context, messages, session, stopWatch);
- }
+ // After a EventProcessor is registered successfully, nothing has to
be done at onTrigger
+ // because new sessions are created when new messages are arrived by
the EventProcessor.
+ context.yield();
+ }
- // Commit NiFi first.
- // If creating an Event Hub checkpoint failed, then the same
message can be retrieved again.
- session.commitAsync(context::checkpoint);
- } catch (Exception e) {
- getLogger().error("Unable to fully process received message
due to " + e, e);
- // FlowFiles those are already committed will not get rollback.
- session.rollback();
- }
+ @OnStopped
+ public void stopClient() {
+ if (eventProcessorClient != null) {
+ eventProcessorClient.stop();
+ eventProcessorClient = null;
+ processSessionFactory = null;
+ readerFactory = null;
+ writerFactory = null;
Review Comment:
I recommend calling the `close()` method in a try block and nulling the
fields even if the `close() `fails. Otherwise `onTrigger()` can never
reinitalize the client because it is not null.
##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java:
##########
@@ -205,98 +193,20 @@ public final List<PropertyDescriptor>
getSupportedPropertyDescriptors() {
@Override
protected Collection<ValidationResult> customValidate(ValidationContext
context) {
- List<ValidationResult> retVal =
AzureEventHubUtils.customValidate(ACCESS_POLICY, POLICY_PRIMARY_KEY, context);
- return retVal;
- }
-
- protected void setupReceiver(final String connectionString, final
ScheduledExecutorService executor) throws ProcessException {
- try {
- EventHubClientImpl.USER_AGENT = "ApacheNiFi-azureeventhub/3.1.1";
- eventHubClient =
EventHubClient.createFromConnectionStringSync(connectionString, executor);
- } catch (IOException | EventHubException e) {
- throw new ProcessException(e);
- }
- }
-
- PartitionReceiver getReceiver(final ProcessContext context, final String
partitionId) throws IOException, EventHubException, ExecutionException,
InterruptedException {
- PartitionReceiver existingReceiver =
partitionToReceiverMap.get(partitionId);
- if (existingReceiver != null) {
- return existingReceiver;
- }
-
- // we want to avoid allowing multiple threads to create Receivers
simultaneously because that could result in
- // having multiple Receivers for the same partition. So if the map
does not contain a receiver for this partition,
- // we will enter a synchronized block and check again (because once we
enter the synchronized block, we know that no
- // other thread is creating a client). If within the synchronized
block, we still do not have an entry in the map,
- // it is up to use to create the receiver, initialize it, and then put
it into the map.
- // We do not use the putIfAbsent method in order to do a CAS operation
here because we want to also initialize the
- // receiver if and only if it is not present in the map. As a result,
we need to initialize the receiver and add it
- // to the map atomically. Hence, the synchronized block.
- synchronized (this) {
- existingReceiver = partitionToReceiverMap.get(partitionId);
- if (existingReceiver != null) {
- return existingReceiver;
- }
-
- final String consumerGroupName =
context.getProperty(CONSUMER_GROUP).getValue();
-
- final PartitionReceiver receiver = eventHubClient.createReceiver(
- consumerGroupName,
- partitionId,
- EventPosition.fromEnqueuedTime(
- configuredEnqueueTime == null ? Instant.now() :
configuredEnqueueTime)).get();
-
- receiver.setReceiveTimeout(receiverFetchTimeout == null ?
Duration.ofMillis(60000) : receiverFetchTimeout);
- partitionToReceiverMap.put(partitionId, receiver);
- return receiver;
-
- }
- }
-
- /**
- * This method is here to try and isolate the Azure related code as the
PartitionReceiver cannot be mocked
- * with PowerMock due to it being final. Unfortunately it extends a base
class and does not implement an interface
- * so even if we create a MockPartitionReciver, it will not work as the
two classes are orthogonal.
- *
- * @param context - The processcontext for this processor
- * @param partitionId - The partition ID to retrieve a receiver by.
- * @return - Returns the events received from the EventBus.
- * @throws ProcessException -- If any exception is encountered, receiving
events it is wrapped in a ProcessException
- * and then that exception is thrown.
- */
- protected Iterable<EventData> receiveEvents(final ProcessContext context,
final String partitionId) throws ProcessException {
- final PartitionReceiver receiver;
- try {
- receiver = getReceiver(context, partitionId);
- return receiver.receive(receiverFetchSize).get();
- } catch (final EventHubException | IOException | ExecutionException |
InterruptedException e) {
- throw new ProcessException(e);
- }
+ return AzureEventHubUtils.customValidate(ACCESS_POLICY,
POLICY_PRIMARY_KEY, context);
}
@OnStopped
- public void tearDown() throws ProcessException {
- for (final PartitionReceiver receiver :
partitionToReceiverMap.values()) {
- if (null != receiver) {
- receiver.close();
- }
- }
-
- partitionToReceiverMap.clear();
- try {
- if (null != eventHubClient) {
- eventHubClient.closeSync();
- }
- executor.shutdown();
- } catch (final EventHubException e) {
- throw new ProcessException(e);
+ public void closeClient() {
+ if (eventHubConsumerClient == null) {
+ getLogger().info("Azure Event Hub Consumer Client not configured");
+ } else {
+ eventHubConsumerClient.close();
}
}
- private ScheduledExecutorService executor;
-
@OnScheduled
- public void onScheduled(final ProcessContext context) throws
ProcessException, URISyntaxException {
+ public void onScheduled(final ProcessContext context) {
final BlockingQueue<String> partitionNames = new
LinkedBlockingQueue<>();
for (int i = 0; i < context.getProperty(NUM_PARTITIONS).asInteger();
i++) {
partitionNames.add(String.valueOf(i));
Review Comment:
The new API provides `EventHubConsumerClient.getPartitionIds()` method and
it could be used instead of the `Number of Event Hub Partitions` property where
the user has to specify the right number (which was quite poor user experience,
in my opinion).
We can also create a follow-up ticket for it if you prefer that.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]