exceptionfactory commented on code in PR #6319:
URL: https://github.com/apache/nifi/pull/6319#discussion_r977179227
##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java:
##########
@@ -205,98 +193,20 @@ public final List<PropertyDescriptor>
getSupportedPropertyDescriptors() {
@Override
protected Collection<ValidationResult> customValidate(ValidationContext
context) {
- List<ValidationResult> retVal =
AzureEventHubUtils.customValidate(ACCESS_POLICY, POLICY_PRIMARY_KEY, context);
- return retVal;
- }
-
- protected void setupReceiver(final String connectionString, final
ScheduledExecutorService executor) throws ProcessException {
- try {
- EventHubClientImpl.USER_AGENT = "ApacheNiFi-azureeventhub/3.1.1";
- eventHubClient =
EventHubClient.createFromConnectionStringSync(connectionString, executor);
- } catch (IOException | EventHubException e) {
- throw new ProcessException(e);
- }
- }
-
- PartitionReceiver getReceiver(final ProcessContext context, final String
partitionId) throws IOException, EventHubException, ExecutionException,
InterruptedException {
- PartitionReceiver existingReceiver =
partitionToReceiverMap.get(partitionId);
- if (existingReceiver != null) {
- return existingReceiver;
- }
-
- // we want to avoid allowing multiple threads to create Receivers
simultaneously because that could result in
- // having multiple Receivers for the same partition. So if the map
does not contain a receiver for this partition,
- // we will enter a synchronized block and check again (because once we
enter the synchronized block, we know that no
- // other thread is creating a client). If within the synchronized
block, we still do not have an entry in the map,
- // it is up to use to create the receiver, initialize it, and then put
it into the map.
- // We do not use the putIfAbsent method in order to do a CAS operation
here because we want to also initialize the
- // receiver if and only if it is not present in the map. As a result,
we need to initialize the receiver and add it
- // to the map atomically. Hence, the synchronized block.
- synchronized (this) {
- existingReceiver = partitionToReceiverMap.get(partitionId);
- if (existingReceiver != null) {
- return existingReceiver;
- }
-
- final String consumerGroupName =
context.getProperty(CONSUMER_GROUP).getValue();
-
- final PartitionReceiver receiver = eventHubClient.createReceiver(
- consumerGroupName,
- partitionId,
- EventPosition.fromEnqueuedTime(
- configuredEnqueueTime == null ? Instant.now() :
configuredEnqueueTime)).get();
-
- receiver.setReceiveTimeout(receiverFetchTimeout == null ?
Duration.ofMillis(60000) : receiverFetchTimeout);
- partitionToReceiverMap.put(partitionId, receiver);
- return receiver;
-
- }
- }
-
- /**
- * This method is here to try and isolate the Azure related code as the
PartitionReceiver cannot be mocked
- * with PowerMock due to it being final. Unfortunately it extends a base
class and does not implement an interface
- * so even if we create a MockPartitionReciver, it will not work as the
two classes are orthogonal.
- *
- * @param context - The processcontext for this processor
- * @param partitionId - The partition ID to retrieve a receiver by.
- * @return - Returns the events received from the EventBus.
- * @throws ProcessException -- If any exception is encountered, receiving
events it is wrapped in a ProcessException
- * and then that exception is thrown.
- */
- protected Iterable<EventData> receiveEvents(final ProcessContext context,
final String partitionId) throws ProcessException {
- final PartitionReceiver receiver;
- try {
- receiver = getReceiver(context, partitionId);
- return receiver.receive(receiverFetchSize).get();
- } catch (final EventHubException | IOException | ExecutionException |
InterruptedException e) {
- throw new ProcessException(e);
- }
+ return AzureEventHubUtils.customValidate(ACCESS_POLICY,
POLICY_PRIMARY_KEY, context);
}
@OnStopped
- public void tearDown() throws ProcessException {
- for (final PartitionReceiver receiver :
partitionToReceiverMap.values()) {
- if (null != receiver) {
- receiver.close();
- }
- }
-
- partitionToReceiverMap.clear();
- try {
- if (null != eventHubClient) {
- eventHubClient.closeSync();
- }
- executor.shutdown();
- } catch (final EventHubException e) {
- throw new ProcessException(e);
+ public void closeClient() {
+ if (eventHubConsumerClient == null) {
+ getLogger().info("Azure Event Hub Consumer Client not configured");
+ } else {
+ eventHubConsumerClient.close();
}
}
- private ScheduledExecutorService executor;
-
@OnScheduled
- public void onScheduled(final ProcessContext context) throws
ProcessException, URISyntaxException {
+ public void onScheduled(final ProcessContext context) {
final BlockingQueue<String> partitionNames = new
LinkedBlockingQueue<>();
for (int i = 0; i < context.getProperty(NUM_PARTITIONS).asInteger();
i++) {
partitionNames.add(String.valueOf(i));
Review Comment:
Thanks for the suggestion, this is a much better user experience. I switched
the implementation to use `getPartitionIds()` and set the existing property to
not required, with an updated description indicating that it is deprecated.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]