This is an automated email from the ASF dual-hosted git repository.

markap14 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 5c69faf  NIFI-6597: Upgrade Azure Event Hub version and code. - Lazy 
instantiation for PutAzureEventHub. Also add explaination for thread pool 
needed for EventHubClient.
5c69faf is described below

commit 5c69faf9bb46a9be9b3e7616f981c11437d33ac1
Author: mysunnytime <[email protected]>
AuthorDate: Mon Aug 19 16:51:04 2019 -0700

    NIFI-6597: Upgrade Azure Event Hub version and code.
    - Lazy instantiation for PutAzureEventHub. Also add explaination for thread 
pool needed for EventHubClient.
---
 .../nifi-azure-processors/pom.xml                  |  5 +-
 .../azure/eventhub/ConsumeAzureEventHub.java       | 14 ++--
 .../azure/eventhub/GetAzureEventHub.java           | 85 +++++++++++----------
 .../azure/eventhub/PutAzureEventHub.java           | 89 +++++++++++++---------
 .../azure/eventhub/GetAzureEventHubTest.java       | 34 +++------
 .../azure/eventhub/PutAzureEventHubTest.java       | 24 +++---
 .../azure/eventhub/TestConsumeAzureEventHub.java   | 44 ++++-------
 7 files changed, 146 insertions(+), 149 deletions(-)

diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
index 6df6b5b..4d30e6b 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
@@ -20,7 +20,8 @@
     <artifactId>nifi-azure-processors</artifactId>
     <packaging>jar</packaging>
     <properties>
-        <azure-eventhubs.version>0.14.4</azure-eventhubs.version>
+        <azure-eventhubs.version>2.3.2</azure-eventhubs.version>
+        <azure-eventhubs-eph.version>2.5.2</azure-eventhubs-eph.version>
     </properties>
     <dependencies>
         <dependency>
@@ -57,7 +58,7 @@
         <dependency>
             <groupId>com.microsoft.azure</groupId>
             <artifactId>azure-eventhubs-eph</artifactId>
-            <version>${azure-eventhubs.version}</version>
+            <version>${azure-eventhubs-eph.version}</version>
         </dependency>
         <dependency>
             <groupId>com.microsoft.azure</groupId>
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java
index 203dd5b..800f26d 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java
@@ -23,8 +23,8 @@ import 
com.microsoft.azure.eventprocessorhost.EventProcessorOptions;
 import com.microsoft.azure.eventprocessorhost.IEventProcessor;
 import com.microsoft.azure.eventprocessorhost.IEventProcessorFactory;
 import com.microsoft.azure.eventprocessorhost.PartitionContext;
-import com.microsoft.azure.servicebus.ConnectionStringBuilder;
-import com.microsoft.azure.servicebus.ReceiverDisconnectedException;
+import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
+import com.microsoft.azure.eventhubs.ReceiverDisconnectedException;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.TriggerSerially;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
@@ -243,7 +243,6 @@ public class ConsumeAzureEventHub extends 
AbstractSessionFactoryProcessor {
             .required(false)
             .build();
 
-
     static final Relationship REL_SUCCESS = new Relationship.Builder()
             .name("success")
             .description("FlowFiles received from Event Hub.")
@@ -531,7 +530,6 @@ public class ConsumeAzureEventHub extends 
AbstractSessionFactoryProcessor {
                             " consumerGroupName={}, exception={}",
                     new Object[]{context.getEventHubPath(), 
context.getPartitionId(), context.getConsumerGroupName(), e}, e);
         }
-
     }
 
     @Override
@@ -606,9 +604,9 @@ public class ConsumeAzureEventHub extends 
AbstractSessionFactoryProcessor {
         final EventProcessorOptions options = new EventProcessorOptions();
         final String initialOffset = 
context.getProperty(INITIAL_OFFSET).getValue();
         if (INITIAL_OFFSET_START_OF_STREAM.getValue().equals(initialOffset)) {
-            options.setInitialOffsetProvider(options.new 
StartOfStreamInitialOffsetProvider());
+            options.setInitialPositionProvider(options.new 
StartOfStreamInitialPositionProvider());
         } else if 
(INITIAL_OFFSET_END_OF_STREAM.getValue().equals(initialOffset)){
-            options.setInitialOffsetProvider(options.new 
EndOfStreamInitialOffsetProvider());
+            options.setInitialPositionProvider(options.new 
EndOfStreamInitialPositionProvider());
         } else {
             throw new IllegalArgumentException("Initial offset " + 
initialOffset + " is not allowed.");
         }
@@ -629,7 +627,7 @@ public class ConsumeAzureEventHub extends 
AbstractSessionFactoryProcessor {
 
         final String storageConnectionString = 
String.format(FORMAT_STORAGE_CONNECTION_STRING, storageAccountName, 
storageAccountKey);
 
-        final ConnectionStringBuilder eventHubConnectionString = new 
ConnectionStringBuilder(namespaceName, eventHubName, sasName, sasKey);
+        final ConnectionStringBuilder eventHubConnectionString = new 
ConnectionStringBuilder().setNamespaceName(namespaceName).setEventHubName( 
eventHubName).setSasKeyName(sasName).setSasKey(sasKey);
 
         eventProcessorHost = new EventProcessorHost(consumerHostname, 
eventHubName, consumerGroupName, eventHubConnectionString.toString(), 
storageConnectionString, containerName);
 
@@ -639,7 +637,6 @@ public class ConsumeAzureEventHub extends 
AbstractSessionFactoryProcessor {
                     new Object[]{eventHubName, consumerGroupName, 
e.getPartitionId(), e.getAction(), e.getHostname()}, e.getException());
         });
 
-
         eventProcessorHost.registerEventProcessorFactory(new 
EventProcessorFactory(), options).get();
     }
 
@@ -652,5 +649,4 @@ public class ConsumeAzureEventHub extends 
AbstractSessionFactoryProcessor {
             throw new IllegalArgumentException(String.format("'%s' is 
required, but not specified.", property.getDisplayName()));
         }
     }
-
 }
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
index 3334703..5843cbe 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
@@ -16,11 +16,35 @@
  */
 package org.apache.nifi.processors.azure.eventhub;
 
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
 import com.microsoft.azure.eventhubs.EventData;
 import com.microsoft.azure.eventhubs.EventHubClient;
+import com.microsoft.azure.eventhubs.EventHubException;
+import com.microsoft.azure.eventhubs.EventPosition;
 import com.microsoft.azure.eventhubs.PartitionReceiver;
-import com.microsoft.azure.servicebus.ConnectionStringBuilder;
-import com.microsoft.azure.servicebus.ServiceBusException;
+import com.microsoft.azure.eventhubs.impl.EventHubClientImpl;
+
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
@@ -40,27 +64,9 @@ import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.util.StopWatch;
 
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.time.Duration;
-import java.time.Instant;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
 @Tags({"azure", "microsoft", "cloud", "eventhub", "events", "streaming", 
"streams"})
-@CapabilityDescription("Receives messages from a Microsoft Azure Event Hub, 
writing the contents of the Azure message to the content of the FlowFile")
+@CapabilityDescription("Receives messages from a Microsoft Azure Event Hub, 
writing the contents of the Azure message to the content of the FlowFile. "
+        + "Note: Please be aware that this processor creates a thread pool of 
4 threads for Event Hub Client. They will be extra threads other than the 
concurrent tasks scheduled for this processor.")
 @InputRequirement(Requirement.INPUT_FORBIDDEN)
 @WritesAttributes({
         @WritesAttribute(attribute = "eventhub.enqueued.timestamp", 
description = "The time (in milliseconds since epoch, UTC) at which the message 
was enqueued in the Azure Event Hub"),
@@ -70,7 +76,6 @@ import java.util.concurrent.TimeUnit;
         @WritesAttribute(attribute = "eventhub.partition", description = "The 
name of the Azure Partition from which the message was pulled")
 })
 public class GetAzureEventHub extends AbstractProcessor {
-
     static final PropertyDescriptor EVENT_HUB_NAME = new 
PropertyDescriptor.Builder()
             .name("Event Hub Name")
             .description("The name of the Azure Event Hub to pull messages 
from")
@@ -128,9 +133,9 @@ public class GetAzureEventHub extends AbstractProcessor {
 
     static final PropertyDescriptor ENQUEUE_TIME = new 
PropertyDescriptor.Builder()
             .name("Event Hub Message Enqueue Time")
-            .description("A timestamp (ISO-8061 Instant) formatted as 
YYYY-MM-DDThhmmss.sssZ (2016-01-01T01:01:01.000Z) from which messages "
+            .description("A timestamp (ISO-8601 Instant) formatted as 
YYYY-MM-DDThhmmss.sssZ (2016-01-01T01:01:01.000Z) from which messages "
                     + "should have been enqueued in the EventHub to start 
reading from")
-            .addValidator(StandardValidators.ISO8061_INSTANT_VALIDATOR)
+            .addValidator(StandardValidators.ISO8601_INSTANT_VALIDATOR)
             .expressionLanguageSupported(ExpressionLanguageScope.NONE)
             .required(false)
             .build();
@@ -198,16 +203,16 @@ public class GetAzureEventHub extends AbstractProcessor {
         return propertyDescriptors;
     }
 
-
-    protected void setupReceiver(final String connectionString) throws 
ProcessException {
+    protected void setupReceiver(final String connectionString, final 
ScheduledExecutorService executor) throws ProcessException {
         try {
-            eventHubClient = 
EventHubClient.createFromConnectionString(connectionString).get();
-        } catch (InterruptedException | ExecutionException | IOException | 
ServiceBusException e) {
+            EventHubClientImpl.USER_AGENT = "ApacheNiFi-azureeventhub/2.3.2";
+            eventHubClient = EventHubClient.createSync(connectionString, 
executor);
+        } catch (IOException | EventHubException e) {
             throw new ProcessException(e);
         }
     }
 
-    PartitionReceiver getReceiver(final ProcessContext context, final String 
partitionId) throws IOException, ServiceBusException, ExecutionException, 
InterruptedException {
+    PartitionReceiver getReceiver(final ProcessContext context, final String 
partitionId) throws IOException, EventHubException, ExecutionException, 
InterruptedException {
         PartitionReceiver existingReceiver = 
partitionToReceiverMap.get(partitionId);
         if (existingReceiver != null) {
             return existingReceiver;
@@ -232,7 +237,8 @@ public class GetAzureEventHub extends AbstractProcessor {
             final PartitionReceiver receiver = eventHubClient.createReceiver(
                     consumerGroupName,
                     partitionId,
-                    configuredEnqueueTime == null ? Instant.now() : 
configuredEnqueueTime).get();
+                    EventPosition.fromEnqueuedTime(
+                            configuredEnqueueTime == null ? Instant.now() : 
configuredEnqueueTime)).get();
 
             receiver.setReceiveTimeout(receiverFetchTimeout == null ? 
Duration.ofMillis(60000) : receiverFetchTimeout);
             partitionToReceiverMap.put(partitionId, receiver);
@@ -257,7 +263,7 @@ public class GetAzureEventHub extends AbstractProcessor {
         try {
             receiver = getReceiver(context, partitionId);
             return receiver.receive(receiverFetchSize).get();
-        } catch (final IOException | ServiceBusException | ExecutionException 
| InterruptedException e) {
+        } catch (final EventHubException | IOException | ExecutionException | 
InterruptedException e) {
             throw new ProcessException(e);
         }
     }
@@ -275,11 +281,14 @@ public class GetAzureEventHub extends AbstractProcessor {
             if (null != eventHubClient) {
                 eventHubClient.closeSync();
             }
-        } catch (final ServiceBusException e) {
+            executor.shutdown();
+        } catch (final EventHubException e) {
             throw new ProcessException(e);
         }
     }
 
+    private ScheduledExecutorService executor;
+
     @OnScheduled
     public void onScheduled(final ProcessContext context) throws 
ProcessException, URISyntaxException {
         final BlockingQueue<String> partitionNames = new 
LinkedBlockingQueue<>();
@@ -294,8 +303,6 @@ public class GetAzureEventHub extends AbstractProcessor {
         final String eventHubName = 
context.getProperty(EVENT_HUB_NAME).getValue();
         final String serviceBusEndpoint = 
context.getProperty(SERVICE_BUS_ENDPOINT).getValue();
 
-
-
         if(context.getProperty(ENQUEUE_TIME).isSet()) {
             configuredEnqueueTime = 
Instant.parse(context.getProperty(ENQUEUE_TIME).toString());
         } else {
@@ -312,11 +319,12 @@ public class GetAzureEventHub extends AbstractProcessor {
             receiverFetchTimeout = null;
         }
 
-        final String connectionString = new ConnectionStringBuilder(new 
URI("amqps://"+namespace+serviceBusEndpoint), eventHubName, policyName, 
policyKey).toString();
-        setupReceiver(connectionString);
+        executor = Executors.newScheduledThreadPool(4);
+        final String connectionString = new 
ConnectionStringBuilder().setEndpoint(
+            new 
URI("amqps://"+namespace+serviceBusEndpoint)).setEventHubName(eventHubName).setSasKeyName(policyName).setSasKey(policyKey).toString();
+        setupReceiver(connectionString, executor);
     }
 
-
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
         final BlockingQueue<String> partitionIds = this.partitionNames;
@@ -370,5 +378,4 @@ public class GetAzureEventHub extends AbstractProcessor {
             partitionIds.offer(partitionId);
         }
     }
-
 }
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java
index 6ebed60..e6de648 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java
@@ -16,16 +16,30 @@
  */
 package org.apache.nifi.processors.azure.eventhub;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
 import com.microsoft.azure.eventhubs.EventData;
 import com.microsoft.azure.eventhubs.EventHubClient;
-import com.microsoft.azure.servicebus.ConnectionStringBuilder;
-import com.microsoft.azure.servicebus.IllegalConnectionStringFormatException;
-import com.microsoft.azure.servicebus.ServiceBusException;
-import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import com.microsoft.azure.eventhubs.EventHubException;
+import com.microsoft.azure.eventhubs.IllegalConnectionStringFormatException;
+import com.microsoft.azure.eventhubs.impl.EventHubClientImpl;
+
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
 import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
@@ -42,23 +56,12 @@ import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.util.StopWatch;
 
-import java.io.IOException;
-import java.util.List;
-import java.util.Set;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Collections;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-
 @SupportsBatching
 @Tags({"microsoft", "azure", "cloud", "eventhub", "events", "streams", 
"streaming"})
 @InputRequirement(Requirement.INPUT_REQUIRED)
 @CapabilityDescription("Sends the contents of a FlowFile to a Windows Azure 
Event Hub. Note: the content of the FlowFile will be buffered into memory 
before being sent, "
-        + "so care should be taken to avoid sending FlowFiles to this 
Processor that exceed the amount of Java Heap Space available.")
+        + "so care should be taken to avoid sending FlowFiles to this 
Processor that exceed the amount of Java Heap Space available. "
+        + "Also please be aware that this processor creates a thread pool of 4 
threads for Event Hub Client. They will be extra threads other than the 
concurrent tasks scheduled for this processor.")
 @SystemResourceConsideration(resource = SystemResource.MEMORY)
 public class PutAzureEventHub extends AbstractProcessor {
     static final PropertyDescriptor EVENT_HUB_NAME = new 
PropertyDescriptor.Builder()
@@ -131,22 +134,10 @@ public class PutAzureEventHub extends AbstractProcessor {
         return propertyDescriptors;
     }
 
+    private ScheduledExecutorService executor;
+
     @OnScheduled
     public final void setupClient(final ProcessContext context) throws 
ProcessException{
-        final String policyName = 
context.getProperty(ACCESS_POLICY).getValue();
-        final String policyKey = 
context.getProperty(POLICY_PRIMARY_KEY).getValue();
-        final String namespace = context.getProperty(NAMESPACE).getValue();
-        final String eventHubName = 
context.getProperty(EVENT_HUB_NAME).getValue();
-
-
-        final int numThreads = context.getMaxConcurrentTasks();
-        senderQueue = new LinkedBlockingQueue<>(numThreads);
-        for (int i = 0; i < numThreads; i++) {
-            final EventHubClient client = createEventHubClient(namespace, 
eventHubName, policyName, policyKey);
-            if(null != client) {
-                senderQueue.offer(client);
-            }
-        }
     }
 
     @OnStopped
@@ -159,6 +150,22 @@ public class PutAzureEventHub extends AbstractProcessor {
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        if(senderQueue.size() == 0){
+            final int numThreads = context.getMaxConcurrentTasks();
+            senderQueue = new LinkedBlockingQueue<>(numThreads);
+            executor = Executors.newScheduledThreadPool(4);
+            final String policyName = 
context.getProperty(ACCESS_POLICY).getValue();
+            final String policyKey = 
context.getProperty(POLICY_PRIMARY_KEY).getValue();
+            final String namespace = context.getProperty(NAMESPACE).getValue();
+            final String eventHubName = 
context.getProperty(EVENT_HUB_NAME).getValue();
+            for (int i = 0; i < numThreads; i++) {
+                final EventHubClient client = createEventHubClient(namespace, 
eventHubName, policyName, policyKey, executor);
+                if(null != client) {
+                    senderQueue.offer(client);
+                }
+            }
+        }
+
         FlowFile flowFile = session.get();
         if (flowFile == null) {
             return;
@@ -183,25 +190,32 @@ public class PutAzureEventHub extends AbstractProcessor {
 
     }
 
-    protected EventHubClient createEventHubClient(final String namespace, 
final String eventHubName, final String policyName, final String policyKey) 
throws ProcessException{
+    protected EventHubClient createEventHubClient(
+        final String namespace,
+        final String eventHubName,
+        final String policyName,
+        final String policyKey,
+        final ScheduledExecutorService executor)
+        throws ProcessException{
 
         try {
-            return 
EventHubClient.createFromConnectionString(getConnectionString(namespace, 
eventHubName, policyName, policyKey)).get();
-        } catch (InterruptedException | ExecutionException | IOException | 
ServiceBusException | IllegalConnectionStringFormatException e) {
+            EventHubClientImpl.USER_AGENT = "ApacheNiFi-azureeventhub/2.3.2";
+            return EventHubClient.createSync(getConnectionString(namespace, 
eventHubName, policyName, policyKey), executor);
+        } catch (IOException | EventHubException | 
IllegalConnectionStringFormatException e) {
             getLogger().error("Failed to create EventHubClient due to {}", e);
             throw new ProcessException(e);
         }
     }
     protected String getConnectionString(final String namespace, final String 
eventHubName, final String policyName, final String policyKey){
-        return new ConnectionStringBuilder(namespace, eventHubName, 
policyName, policyKey).toString();
+        return new 
ConnectionStringBuilder().setNamespaceName(namespace).setEventHubName(eventHubName).setSasKeyName(policyName).setSasKey(policyKey).toString();
     }
     protected void sendMessage(final byte[] buffer) throws ProcessException {
 
         final EventHubClient sender = senderQueue.poll();
         if(null != sender) {
             try {
-                sender.sendSync(new EventData(buffer));
-            } catch (final ServiceBusException sbe) {
+                sender.sendSync(EventData.create(buffer));
+            } catch (final EventHubException sbe) {
                 throw new ProcessException("Caught exception trying to send 
message to eventbus", sbe);
             } finally {
                 senderQueue.offer(sender);
@@ -209,6 +223,5 @@ public class PutAzureEventHub extends AbstractProcessor {
         }else{
             throw new ProcessException("No EventHubClients are configured for 
sending");
         }
-
     }
 }
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
index df47d5f..6e55f60 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
@@ -19,11 +19,10 @@ package org.apache.nifi.processors.azure.eventhub;
 import com.microsoft.azure.eventhubs.EventData;
 import com.microsoft.azure.eventhubs.EventData.SystemProperties;
 import com.microsoft.azure.eventhubs.PartitionReceiver;
-import com.microsoft.azure.servicebus.ServiceBusException;
-import com.microsoft.azure.servicebus.amqp.AmqpConstants;
+import com.microsoft.azure.eventhubs.EventHubException;
+import com.microsoft.azure.eventhubs.impl.AmqpConstants;
 
 import java.io.IOException;
-import java.lang.reflect.Field;
 import java.time.Clock;
 import java.time.Instant;
 import java.time.ZoneId;
@@ -31,8 +30,8 @@ import java.util.Date;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
 
-import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.util.MockFlowFile;
@@ -41,9 +40,7 @@ import org.apache.nifi.util.TestRunners;
 import org.junit.Before;
 import org.junit.Test;
 
-
 public class GetAzureEventHubTest {
-
     private static final String namespaceName = "nifi-azure-hub";
     private static final String eventHubName = "get-test";
     private static final String sasKeyName = "bogus-policy";
@@ -158,11 +155,11 @@ public class GetAzureEventHubTest {
         boolean received = true;
 
         @Override
-        protected void setupReceiver(final String connectionString) throws 
ProcessException{
+        protected void setupReceiver(final String connectionString, final 
ScheduledExecutorService executor) throws ProcessException{
             //do nothing
         }
         @Override
-        protected PartitionReceiver getReceiver(final ProcessContext context, 
final String partitionId) throws IOException, ServiceBusException, 
ExecutionException, InterruptedException {
+        protected PartitionReceiver getReceiver(final ProcessContext context, 
final String partitionId) throws IOException, EventHubException, 
ExecutionException, InterruptedException {
             if(getReceiverThrow){
                 throw new IOException("Could not create receiver");
             }
@@ -179,7 +176,7 @@ public class GetAzureEventHubTest {
             }
             final LinkedList<EventData> receivedEvents = new LinkedList<>();
             for(int i = 0; i < 10; i++){
-                EventData eventData = new EventData(String.format("test event 
number: %d", i).getBytes());
+                EventData eventData = EventData.create(String.format("test 
event number: %d", i).getBytes());
                 if (received) {
                     HashMap<String, Object> properties = new HashMap<>();
                     
properties.put(AmqpConstants.PARTITION_KEY_ANNOTATION_NAME, 
PARTITION_KEY_VALUE);
@@ -188,26 +185,18 @@ public class GetAzureEventHubTest {
                     
properties.put(AmqpConstants.ENQUEUED_TIME_UTC_ANNOTATION_NAME, 
ENQUEUED_TIME_VALUE);
 
                     SystemProperties systemProperties = new 
SystemProperties(properties);
-                    Field systemPropertiesField = 
FieldUtils.getDeclaredField(EventData.class, "systemProperties", true);
-                    try {
-                        systemPropertiesField.set(eventData, systemProperties);
-                    } catch (IllegalAccessException e) {
-                        throw new ProcessException("Could not set 
systemProperties on EventData", e);
-                    }
+                    eventData.setSystemProperties(systemProperties);
                 }
                 receivedEvents.add(eventData);
             }
 
             return receivedEvents;
-
         }
     }
 
     public static class MockGetAzureEventHubNoPartitions extends 
GetAzureEventHub{
-
-
         @Override
-        protected void setupReceiver(final String connectionString) throws 
ProcessException{
+        protected void setupReceiver(final String connectionString, final 
ScheduledExecutorService executor) throws ProcessException{
             //do nothing
         }
 
@@ -215,10 +204,10 @@ public class GetAzureEventHubTest {
         public void onScheduled(final ProcessContext context) throws 
ProcessException {
 
         }
+        @Override
+        public void tearDown() throws ProcessException {
+        }
     }
-
-
-
     private void setUpStandardTestConfig() {
         testRunner.setProperty(GetAzureEventHub.EVENT_HUB_NAME,eventHubName);
         testRunner.setProperty(GetAzureEventHub.NAMESPACE,namespaceName);
@@ -227,5 +216,4 @@ public class GetAzureEventHubTest {
         testRunner.setProperty(GetAzureEventHub.NUM_PARTITIONS,"4");
         testRunner.assertValid();
     }
-
 }
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java
index 1129ac7..ab556c5 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java
@@ -23,6 +23,8 @@ import org.apache.nifi.util.TestRunners;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.concurrent.ScheduledExecutorService;
+
 public class PutAzureEventHubTest {
     private static final String namespaceName = "nifi-azure-hub";
     private static final String eventHubName = "get-test";
@@ -51,13 +53,10 @@ public class PutAzureEventHubTest {
     }
     @Test
     public void verifyRelationships(){
-
         assert(2 == processor.getRelationships().size());
-
     }
     @Test
     public void testNoFlow() {
-
         setUpStandardTestConfig();
         testRunner.run(1, true);
         testRunner.assertAllFlowFilesTransferred(PutAzureEventHub.REL_SUCCESS, 
0);
@@ -65,7 +64,6 @@ public class PutAzureEventHubTest {
     }
     @Test
     public void testNormalFlow(){
-
         setUpStandardTestConfig();
         String flowFileContents = "TEST MESSAGE";
         testRunner.enqueue(flowFileContents);
@@ -76,7 +74,6 @@ public class PutAzureEventHubTest {
     }
     @Test
     public void testSendMessageThrows() {
-
         PutAzureEventHubTest.OnSendThrowingMockPutAzureEventHub 
throwingProcessor = new 
PutAzureEventHubTest.OnSendThrowingMockPutAzureEventHub();
         testRunner = TestRunners.newTestRunner(throwingProcessor);
         setUpStandardTestConfig();
@@ -89,7 +86,6 @@ public class PutAzureEventHubTest {
 
     @Test(expected = AssertionError.class)
     public void testBadConnectionString() {
-
         PutAzureEventHubTest.BogusConnectionStringMockPutAzureEventHub 
badConnectionStringProcessor = new 
PutAzureEventHubTest.BogusConnectionStringMockPutAzureEventHub();
         testRunner = TestRunners.newTestRunner(badConnectionStringProcessor);
         setUpStandardTestConfig();
@@ -97,14 +93,18 @@ public class PutAzureEventHubTest {
     }
 
     private static class MockPutAzureEventHub extends PutAzureEventHub{
-
         byte[] receivedBuffer = null;
         byte[] getReceivedBuffer(){
             return receivedBuffer;
         }
 
         @Override
-        protected EventHubClient createEventHubClient(final String namespace, 
final String eventHubName, final String policyName, final String policyKey) 
throws ProcessException {
+        protected EventHubClient createEventHubClient(
+            final String namespace,
+            final String eventHubName,
+            final String policyName,
+            final String policyKey,
+            final ScheduledExecutorService executor) throws ProcessException {
             return null;
         }
 
@@ -115,12 +115,16 @@ public class PutAzureEventHubTest {
     }
     private static class OnSendThrowingMockPutAzureEventHub extends 
PutAzureEventHub{
         @Override
-        protected EventHubClient createEventHubClient(final String namespace, 
final String eventHubName, final String policyName, final String policyKey) 
throws ProcessException {
+        protected EventHubClient createEventHubClient(
+            final String namespace,
+            final String eventHubName,
+            final String policyName,
+            final String policyKey,
+            final ScheduledExecutorService executor) throws ProcessException {
             return null;
         }
     }
     private static class BogusConnectionStringMockPutAzureEventHub extends 
PutAzureEventHub{
-
         @Override
         protected String getConnectionString(final String namespace, final 
String eventHubName, final String policyName, final String policyKey){
             return "Bogus Connection String";
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java
index 6f35093..e7edef7 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java
@@ -66,7 +66,6 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 public class TestConsumeAzureEventHub {
-
     private ConsumeAzureEventHub.EventProcessor eventProcessor;
     private MockProcessSession processSession;
     private SharedSessionState sharedState;
@@ -101,8 +100,7 @@ public class TestConsumeAzureEventHub {
 
     @Test
     public void testReceiveOne() throws Exception {
-
-        final Iterable<EventData> eventDataList = Arrays.asList(new 
EventData("one".getBytes(StandardCharsets.UTF_8)));
+        final Iterable<EventData> eventDataList = 
Arrays.asList(EventData.create("one".getBytes(StandardCharsets.UTF_8)));
         eventProcessor.onEvents(partitionContext, eventDataList);
 
         processSession.assertCommitted();
@@ -121,13 +119,11 @@ public class TestConsumeAzureEventHub {
                 
"eventhub-name/ConsumerGroups/consumer-group/Partitions/partition-id", 
provenanceEvent1.getTransitUri());
     }
 
-
     @Test
     public void testReceiveTwo() throws Exception {
-
         final Iterable<EventData> eventDataList = Arrays.asList(
-                new EventData("one".getBytes(StandardCharsets.UTF_8)),
-                new EventData("two".getBytes(StandardCharsets.UTF_8))
+                EventData.create("one".getBytes(StandardCharsets.UTF_8)),
+                EventData.create("two".getBytes(StandardCharsets.UTF_8))
         );
         eventProcessor.onEvents(partitionContext, eventDataList);
 
@@ -145,10 +141,9 @@ public class TestConsumeAzureEventHub {
 
     @Test
     public void testCheckpointFailure() throws Exception {
-
         final Iterable<EventData> eventDataList = Arrays.asList(
-                new EventData("one".getBytes(StandardCharsets.UTF_8)),
-                new EventData("two".getBytes(StandardCharsets.UTF_8))
+                EventData.create("one".getBytes(StandardCharsets.UTF_8)),
+                EventData.create("two".getBytes(StandardCharsets.UTF_8))
         );
         doThrow(new RuntimeException("Failed to create a 
checkpoint.")).when(partitionContext).checkpoint();
         eventProcessor.onEvents(partitionContext, eventDataList);
@@ -239,15 +234,13 @@ public class TestConsumeAzureEventHub {
                         .thenThrow(new MalformedRecordException("Simulating 
Record parse failure."))
                         .thenReturn(records2[0], Arrays.copyOfRange(records2, 
1, records2.length));
         }
-
     }
 
     @Test
     public void testReceiveRecords() throws Exception {
-
         final List<EventData> eventDataList = Arrays.asList(
-                new EventData("one".getBytes(StandardCharsets.UTF_8)),
-                new EventData("two".getBytes(StandardCharsets.UTF_8))
+                EventData.create("one".getBytes(StandardCharsets.UTF_8)),
+                EventData.create("two".getBytes(StandardCharsets.UTF_8))
         );
 
         setupRecordReader(eventDataList);
@@ -274,12 +267,11 @@ public class TestConsumeAzureEventHub {
 
     @Test
     public void testReceiveRecordReaderFailure() throws Exception {
-
         final List<EventData> eventDataList = Arrays.asList(
-                new EventData("one".getBytes(StandardCharsets.UTF_8)),
-                new EventData("two".getBytes(StandardCharsets.UTF_8)),
-                new EventData("three".getBytes(StandardCharsets.UTF_8)),
-                new EventData("four".getBytes(StandardCharsets.UTF_8))
+                EventData.create("one".getBytes(StandardCharsets.UTF_8)),
+                EventData.create("two".getBytes(StandardCharsets.UTF_8)),
+                EventData.create("three".getBytes(StandardCharsets.UTF_8)),
+                EventData.create("four".getBytes(StandardCharsets.UTF_8))
         );
 
         setupRecordReader(eventDataList, 2, null);
@@ -319,9 +311,8 @@ public class TestConsumeAzureEventHub {
 
     @Test
     public void testReceiveAllRecordFailure() throws Exception {
-
         final List<EventData> eventDataList = Collections.singletonList(
-                new EventData("one".getBytes(StandardCharsets.UTF_8))
+                EventData.create("one".getBytes(StandardCharsets.UTF_8))
         );
 
         setupRecordReader(eventDataList, 0, null);
@@ -348,17 +339,15 @@ public class TestConsumeAzureEventHub {
         assertEquals(ProvenanceEventType.RECEIVE, 
provenanceEvent1.getEventType());
         assertEquals("amqps://namespace.servicebus.windows.net/" +
                 
"eventhub-name/ConsumerGroups/consumer-group/Partitions/partition-id", 
provenanceEvent1.getTransitUri());
-
     }
 
     @Test
     public void testReceiveRecordWriterFailure() throws Exception {
-
         final List<EventData> eventDataList = Arrays.asList(
-                new EventData("one".getBytes(StandardCharsets.UTF_8)),
-                new EventData("two".getBytes(StandardCharsets.UTF_8)),
-                new EventData("three".getBytes(StandardCharsets.UTF_8)),
-                new EventData("four".getBytes(StandardCharsets.UTF_8))
+                EventData.create("one".getBytes(StandardCharsets.UTF_8)),
+                EventData.create("two".getBytes(StandardCharsets.UTF_8)),
+                EventData.create("three".getBytes(StandardCharsets.UTF_8)),
+                EventData.create("four".getBytes(StandardCharsets.UTF_8))
         );
 
         setupRecordReader(eventDataList, -1, "two");
@@ -395,5 +384,4 @@ public class TestConsumeAzureEventHub {
         assertEquals("amqps://namespace.servicebus.windows.net/" +
                 
"eventhub-name/ConsumerGroups/consumer-group/Partitions/partition-id", 
provenanceEvent2.getTransitUri());
     }
-
 }

Reply via email to