This is an automated email from the ASF dual-hosted git repository.
markap14 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 5c69faf NIFI-6597: Upgrade Azure Event Hub version and code. - Lazy
instantiation for PutAzureEventHub. Also add explaination for thread pool
needed for EventHubClient.
5c69faf is described below
commit 5c69faf9bb46a9be9b3e7616f981c11437d33ac1
Author: mysunnytime <[email protected]>
AuthorDate: Mon Aug 19 16:51:04 2019 -0700
NIFI-6597: Upgrade Azure Event Hub version and code.
- Lazy instantiation for PutAzureEventHub. Also add explaination for thread
pool needed for EventHubClient.
---
.../nifi-azure-processors/pom.xml | 5 +-
.../azure/eventhub/ConsumeAzureEventHub.java | 14 ++--
.../azure/eventhub/GetAzureEventHub.java | 85 +++++++++++----------
.../azure/eventhub/PutAzureEventHub.java | 89 +++++++++++++---------
.../azure/eventhub/GetAzureEventHubTest.java | 34 +++------
.../azure/eventhub/PutAzureEventHubTest.java | 24 +++---
.../azure/eventhub/TestConsumeAzureEventHub.java | 44 ++++-------
7 files changed, 146 insertions(+), 149 deletions(-)
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
index 6df6b5b..4d30e6b 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
@@ -20,7 +20,8 @@
<artifactId>nifi-azure-processors</artifactId>
<packaging>jar</packaging>
<properties>
- <azure-eventhubs.version>0.14.4</azure-eventhubs.version>
+ <azure-eventhubs.version>2.3.2</azure-eventhubs.version>
+ <azure-eventhubs-eph.version>2.5.2</azure-eventhubs-eph.version>
</properties>
<dependencies>
<dependency>
@@ -57,7 +58,7 @@
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs-eph</artifactId>
- <version>${azure-eventhubs.version}</version>
+ <version>${azure-eventhubs-eph.version}</version>
</dependency>
<dependency>
<groupId>com.microsoft.azure</groupId>
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java
index 203dd5b..800f26d 100644
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java
@@ -23,8 +23,8 @@ import
com.microsoft.azure.eventprocessorhost.EventProcessorOptions;
import com.microsoft.azure.eventprocessorhost.IEventProcessor;
import com.microsoft.azure.eventprocessorhost.IEventProcessorFactory;
import com.microsoft.azure.eventprocessorhost.PartitionContext;
-import com.microsoft.azure.servicebus.ConnectionStringBuilder;
-import com.microsoft.azure.servicebus.ReceiverDisconnectedException;
+import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
+import com.microsoft.azure.eventhubs.ReceiverDisconnectedException;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.WritesAttribute;
@@ -243,7 +243,6 @@ public class ConsumeAzureEventHub extends
AbstractSessionFactoryProcessor {
.required(false)
.build();
-
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("FlowFiles received from Event Hub.")
@@ -531,7 +530,6 @@ public class ConsumeAzureEventHub extends
AbstractSessionFactoryProcessor {
" consumerGroupName={}, exception={}",
new Object[]{context.getEventHubPath(),
context.getPartitionId(), context.getConsumerGroupName(), e}, e);
}
-
}
@Override
@@ -606,9 +604,9 @@ public class ConsumeAzureEventHub extends
AbstractSessionFactoryProcessor {
final EventProcessorOptions options = new EventProcessorOptions();
final String initialOffset =
context.getProperty(INITIAL_OFFSET).getValue();
if (INITIAL_OFFSET_START_OF_STREAM.getValue().equals(initialOffset)) {
- options.setInitialOffsetProvider(options.new
StartOfStreamInitialOffsetProvider());
+ options.setInitialPositionProvider(options.new
StartOfStreamInitialPositionProvider());
} else if
(INITIAL_OFFSET_END_OF_STREAM.getValue().equals(initialOffset)){
- options.setInitialOffsetProvider(options.new
EndOfStreamInitialOffsetProvider());
+ options.setInitialPositionProvider(options.new
EndOfStreamInitialPositionProvider());
} else {
throw new IllegalArgumentException("Initial offset " +
initialOffset + " is not allowed.");
}
@@ -629,7 +627,7 @@ public class ConsumeAzureEventHub extends
AbstractSessionFactoryProcessor {
final String storageConnectionString =
String.format(FORMAT_STORAGE_CONNECTION_STRING, storageAccountName,
storageAccountKey);
- final ConnectionStringBuilder eventHubConnectionString = new
ConnectionStringBuilder(namespaceName, eventHubName, sasName, sasKey);
+ final ConnectionStringBuilder eventHubConnectionString = new
ConnectionStringBuilder().setNamespaceName(namespaceName).setEventHubName(
eventHubName).setSasKeyName(sasName).setSasKey(sasKey);
eventProcessorHost = new EventProcessorHost(consumerHostname,
eventHubName, consumerGroupName, eventHubConnectionString.toString(),
storageConnectionString, containerName);
@@ -639,7 +637,6 @@ public class ConsumeAzureEventHub extends
AbstractSessionFactoryProcessor {
new Object[]{eventHubName, consumerGroupName,
e.getPartitionId(), e.getAction(), e.getHostname()}, e.getException());
});
-
eventProcessorHost.registerEventProcessorFactory(new
EventProcessorFactory(), options).get();
}
@@ -652,5 +649,4 @@ public class ConsumeAzureEventHub extends
AbstractSessionFactoryProcessor {
throw new IllegalArgumentException(String.format("'%s' is
required, but not specified.", property.getDisplayName()));
}
}
-
}
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
index 3334703..5843cbe 100644
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
@@ -16,11 +16,35 @@
*/
package org.apache.nifi.processors.azure.eventhub;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventHubClient;
+import com.microsoft.azure.eventhubs.EventHubException;
+import com.microsoft.azure.eventhubs.EventPosition;
import com.microsoft.azure.eventhubs.PartitionReceiver;
-import com.microsoft.azure.servicebus.ConnectionStringBuilder;
-import com.microsoft.azure.servicebus.ServiceBusException;
+import com.microsoft.azure.eventhubs.impl.EventHubClientImpl;
+
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
@@ -40,27 +64,9 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StopWatch;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.time.Duration;
-import java.time.Instant;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
@Tags({"azure", "microsoft", "cloud", "eventhub", "events", "streaming",
"streams"})
-@CapabilityDescription("Receives messages from a Microsoft Azure Event Hub,
writing the contents of the Azure message to the content of the FlowFile")
+@CapabilityDescription("Receives messages from a Microsoft Azure Event Hub,
writing the contents of the Azure message to the content of the FlowFile. "
+ + "Note: Please be aware that this processor creates a thread pool of
4 threads for Event Hub Client. They will be extra threads other than the
concurrent tasks scheduled for this processor.")
@InputRequirement(Requirement.INPUT_FORBIDDEN)
@WritesAttributes({
@WritesAttribute(attribute = "eventhub.enqueued.timestamp",
description = "The time (in milliseconds since epoch, UTC) at which the message
was enqueued in the Azure Event Hub"),
@@ -70,7 +76,6 @@ import java.util.concurrent.TimeUnit;
@WritesAttribute(attribute = "eventhub.partition", description = "The
name of the Azure Partition from which the message was pulled")
})
public class GetAzureEventHub extends AbstractProcessor {
-
static final PropertyDescriptor EVENT_HUB_NAME = new
PropertyDescriptor.Builder()
.name("Event Hub Name")
.description("The name of the Azure Event Hub to pull messages
from")
@@ -128,9 +133,9 @@ public class GetAzureEventHub extends AbstractProcessor {
static final PropertyDescriptor ENQUEUE_TIME = new
PropertyDescriptor.Builder()
.name("Event Hub Message Enqueue Time")
- .description("A timestamp (ISO-8061 Instant) formatted as
YYYY-MM-DDThhmmss.sssZ (2016-01-01T01:01:01.000Z) from which messages "
+ .description("A timestamp (ISO-8601 Instant) formatted as
YYYY-MM-DDThhmmss.sssZ (2016-01-01T01:01:01.000Z) from which messages "
+ "should have been enqueued in the EventHub to start
reading from")
- .addValidator(StandardValidators.ISO8061_INSTANT_VALIDATOR)
+ .addValidator(StandardValidators.ISO8601_INSTANT_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.required(false)
.build();
@@ -198,16 +203,16 @@ public class GetAzureEventHub extends AbstractProcessor {
return propertyDescriptors;
}
-
- protected void setupReceiver(final String connectionString) throws
ProcessException {
+ protected void setupReceiver(final String connectionString, final
ScheduledExecutorService executor) throws ProcessException {
try {
- eventHubClient =
EventHubClient.createFromConnectionString(connectionString).get();
- } catch (InterruptedException | ExecutionException | IOException |
ServiceBusException e) {
+ EventHubClientImpl.USER_AGENT = "ApacheNiFi-azureeventhub/2.3.2";
+ eventHubClient = EventHubClient.createSync(connectionString,
executor);
+ } catch (IOException | EventHubException e) {
throw new ProcessException(e);
}
}
- PartitionReceiver getReceiver(final ProcessContext context, final String
partitionId) throws IOException, ServiceBusException, ExecutionException,
InterruptedException {
+ PartitionReceiver getReceiver(final ProcessContext context, final String
partitionId) throws IOException, EventHubException, ExecutionException,
InterruptedException {
PartitionReceiver existingReceiver =
partitionToReceiverMap.get(partitionId);
if (existingReceiver != null) {
return existingReceiver;
@@ -232,7 +237,8 @@ public class GetAzureEventHub extends AbstractProcessor {
final PartitionReceiver receiver = eventHubClient.createReceiver(
consumerGroupName,
partitionId,
- configuredEnqueueTime == null ? Instant.now() :
configuredEnqueueTime).get();
+ EventPosition.fromEnqueuedTime(
+ configuredEnqueueTime == null ? Instant.now() :
configuredEnqueueTime)).get();
receiver.setReceiveTimeout(receiverFetchTimeout == null ?
Duration.ofMillis(60000) : receiverFetchTimeout);
partitionToReceiverMap.put(partitionId, receiver);
@@ -257,7 +263,7 @@ public class GetAzureEventHub extends AbstractProcessor {
try {
receiver = getReceiver(context, partitionId);
return receiver.receive(receiverFetchSize).get();
- } catch (final IOException | ServiceBusException | ExecutionException
| InterruptedException e) {
+ } catch (final EventHubException | IOException | ExecutionException |
InterruptedException e) {
throw new ProcessException(e);
}
}
@@ -275,11 +281,14 @@ public class GetAzureEventHub extends AbstractProcessor {
if (null != eventHubClient) {
eventHubClient.closeSync();
}
- } catch (final ServiceBusException e) {
+ executor.shutdown();
+ } catch (final EventHubException e) {
throw new ProcessException(e);
}
}
+ private ScheduledExecutorService executor;
+
@OnScheduled
public void onScheduled(final ProcessContext context) throws
ProcessException, URISyntaxException {
final BlockingQueue<String> partitionNames = new
LinkedBlockingQueue<>();
@@ -294,8 +303,6 @@ public class GetAzureEventHub extends AbstractProcessor {
final String eventHubName =
context.getProperty(EVENT_HUB_NAME).getValue();
final String serviceBusEndpoint =
context.getProperty(SERVICE_BUS_ENDPOINT).getValue();
-
-
if(context.getProperty(ENQUEUE_TIME).isSet()) {
configuredEnqueueTime =
Instant.parse(context.getProperty(ENQUEUE_TIME).toString());
} else {
@@ -312,11 +319,12 @@ public class GetAzureEventHub extends AbstractProcessor {
receiverFetchTimeout = null;
}
- final String connectionString = new ConnectionStringBuilder(new
URI("amqps://"+namespace+serviceBusEndpoint), eventHubName, policyName,
policyKey).toString();
- setupReceiver(connectionString);
+ executor = Executors.newScheduledThreadPool(4);
+ final String connectionString = new
ConnectionStringBuilder().setEndpoint(
+ new
URI("amqps://"+namespace+serviceBusEndpoint)).setEventHubName(eventHubName).setSasKeyName(policyName).setSasKey(policyKey).toString();
+ setupReceiver(connectionString, executor);
}
-
@Override
public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
final BlockingQueue<String> partitionIds = this.partitionNames;
@@ -370,5 +378,4 @@ public class GetAzureEventHub extends AbstractProcessor {
partitionIds.offer(partitionId);
}
}
-
}
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java
index 6ebed60..e6de648 100644
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java
@@ -16,16 +16,30 @@
*/
package org.apache.nifi.processors.azure.eventhub;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventHubClient;
-import com.microsoft.azure.servicebus.ConnectionStringBuilder;
-import com.microsoft.azure.servicebus.IllegalConnectionStringFormatException;
-import com.microsoft.azure.servicebus.ServiceBusException;
-import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import com.microsoft.azure.eventhubs.EventHubException;
+import com.microsoft.azure.eventhubs.IllegalConnectionStringFormatException;
+import com.microsoft.azure.eventhubs.impl.EventHubClientImpl;
+
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
@@ -42,23 +56,12 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.StopWatch;
-import java.io.IOException;
-import java.util.List;
-import java.util.Set;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Collections;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-
@SupportsBatching
@Tags({"microsoft", "azure", "cloud", "eventhub", "events", "streams",
"streaming"})
@InputRequirement(Requirement.INPUT_REQUIRED)
@CapabilityDescription("Sends the contents of a FlowFile to a Windows Azure
Event Hub. Note: the content of the FlowFile will be buffered into memory
before being sent, "
- + "so care should be taken to avoid sending FlowFiles to this
Processor that exceed the amount of Java Heap Space available.")
+ + "so care should be taken to avoid sending FlowFiles to this
Processor that exceed the amount of Java Heap Space available. "
+ + "Also please be aware that this processor creates a thread pool of 4
threads for Event Hub Client. They will be extra threads other than the
concurrent tasks scheduled for this processor.")
@SystemResourceConsideration(resource = SystemResource.MEMORY)
public class PutAzureEventHub extends AbstractProcessor {
static final PropertyDescriptor EVENT_HUB_NAME = new
PropertyDescriptor.Builder()
@@ -131,22 +134,10 @@ public class PutAzureEventHub extends AbstractProcessor {
return propertyDescriptors;
}
+ private ScheduledExecutorService executor;
+
@OnScheduled
public final void setupClient(final ProcessContext context) throws
ProcessException{
- final String policyName =
context.getProperty(ACCESS_POLICY).getValue();
- final String policyKey =
context.getProperty(POLICY_PRIMARY_KEY).getValue();
- final String namespace = context.getProperty(NAMESPACE).getValue();
- final String eventHubName =
context.getProperty(EVENT_HUB_NAME).getValue();
-
-
- final int numThreads = context.getMaxConcurrentTasks();
- senderQueue = new LinkedBlockingQueue<>(numThreads);
- for (int i = 0; i < numThreads; i++) {
- final EventHubClient client = createEventHubClient(namespace,
eventHubName, policyName, policyKey);
- if(null != client) {
- senderQueue.offer(client);
- }
- }
}
@OnStopped
@@ -159,6 +150,22 @@ public class PutAzureEventHub extends AbstractProcessor {
@Override
public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
+ if(senderQueue.size() == 0){
+ final int numThreads = context.getMaxConcurrentTasks();
+ senderQueue = new LinkedBlockingQueue<>(numThreads);
+ executor = Executors.newScheduledThreadPool(4);
+ final String policyName =
context.getProperty(ACCESS_POLICY).getValue();
+ final String policyKey =
context.getProperty(POLICY_PRIMARY_KEY).getValue();
+ final String namespace = context.getProperty(NAMESPACE).getValue();
+ final String eventHubName =
context.getProperty(EVENT_HUB_NAME).getValue();
+ for (int i = 0; i < numThreads; i++) {
+ final EventHubClient client = createEventHubClient(namespace,
eventHubName, policyName, policyKey, executor);
+ if(null != client) {
+ senderQueue.offer(client);
+ }
+ }
+ }
+
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
@@ -183,25 +190,32 @@ public class PutAzureEventHub extends AbstractProcessor {
}
- protected EventHubClient createEventHubClient(final String namespace,
final String eventHubName, final String policyName, final String policyKey)
throws ProcessException{
+ protected EventHubClient createEventHubClient(
+ final String namespace,
+ final String eventHubName,
+ final String policyName,
+ final String policyKey,
+ final ScheduledExecutorService executor)
+ throws ProcessException{
try {
- return
EventHubClient.createFromConnectionString(getConnectionString(namespace,
eventHubName, policyName, policyKey)).get();
- } catch (InterruptedException | ExecutionException | IOException |
ServiceBusException | IllegalConnectionStringFormatException e) {
+ EventHubClientImpl.USER_AGENT = "ApacheNiFi-azureeventhub/2.3.2";
+ return EventHubClient.createSync(getConnectionString(namespace,
eventHubName, policyName, policyKey), executor);
+ } catch (IOException | EventHubException |
IllegalConnectionStringFormatException e) {
getLogger().error("Failed to create EventHubClient due to {}", e);
throw new ProcessException(e);
}
}
protected String getConnectionString(final String namespace, final String
eventHubName, final String policyName, final String policyKey){
- return new ConnectionStringBuilder(namespace, eventHubName,
policyName, policyKey).toString();
+ return new
ConnectionStringBuilder().setNamespaceName(namespace).setEventHubName(eventHubName).setSasKeyName(policyName).setSasKey(policyKey).toString();
}
protected void sendMessage(final byte[] buffer) throws ProcessException {
final EventHubClient sender = senderQueue.poll();
if(null != sender) {
try {
- sender.sendSync(new EventData(buffer));
- } catch (final ServiceBusException sbe) {
+ sender.sendSync(EventData.create(buffer));
+ } catch (final EventHubException sbe) {
throw new ProcessException("Caught exception trying to send
message to eventbus", sbe);
} finally {
senderQueue.offer(sender);
@@ -209,6 +223,5 @@ public class PutAzureEventHub extends AbstractProcessor {
}else{
throw new ProcessException("No EventHubClients are configured for
sending");
}
-
}
}
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
index df47d5f..6e55f60 100644
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
@@ -19,11 +19,10 @@ package org.apache.nifi.processors.azure.eventhub;
import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventData.SystemProperties;
import com.microsoft.azure.eventhubs.PartitionReceiver;
-import com.microsoft.azure.servicebus.ServiceBusException;
-import com.microsoft.azure.servicebus.amqp.AmqpConstants;
+import com.microsoft.azure.eventhubs.EventHubException;
+import com.microsoft.azure.eventhubs.impl.AmqpConstants;
import java.io.IOException;
-import java.lang.reflect.Field;
import java.time.Clock;
import java.time.Instant;
import java.time.ZoneId;
@@ -31,8 +30,8 @@ import java.util.Date;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
-import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.util.MockFlowFile;
@@ -41,9 +40,7 @@ import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Test;
-
public class GetAzureEventHubTest {
-
private static final String namespaceName = "nifi-azure-hub";
private static final String eventHubName = "get-test";
private static final String sasKeyName = "bogus-policy";
@@ -158,11 +155,11 @@ public class GetAzureEventHubTest {
boolean received = true;
@Override
- protected void setupReceiver(final String connectionString) throws
ProcessException{
+ protected void setupReceiver(final String connectionString, final
ScheduledExecutorService executor) throws ProcessException{
//do nothing
}
@Override
- protected PartitionReceiver getReceiver(final ProcessContext context,
final String partitionId) throws IOException, ServiceBusException,
ExecutionException, InterruptedException {
+ protected PartitionReceiver getReceiver(final ProcessContext context,
final String partitionId) throws IOException, EventHubException,
ExecutionException, InterruptedException {
if(getReceiverThrow){
throw new IOException("Could not create receiver");
}
@@ -179,7 +176,7 @@ public class GetAzureEventHubTest {
}
final LinkedList<EventData> receivedEvents = new LinkedList<>();
for(int i = 0; i < 10; i++){
- EventData eventData = new EventData(String.format("test event
number: %d", i).getBytes());
+ EventData eventData = EventData.create(String.format("test
event number: %d", i).getBytes());
if (received) {
HashMap<String, Object> properties = new HashMap<>();
properties.put(AmqpConstants.PARTITION_KEY_ANNOTATION_NAME,
PARTITION_KEY_VALUE);
@@ -188,26 +185,18 @@ public class GetAzureEventHubTest {
properties.put(AmqpConstants.ENQUEUED_TIME_UTC_ANNOTATION_NAME,
ENQUEUED_TIME_VALUE);
SystemProperties systemProperties = new
SystemProperties(properties);
- Field systemPropertiesField =
FieldUtils.getDeclaredField(EventData.class, "systemProperties", true);
- try {
- systemPropertiesField.set(eventData, systemProperties);
- } catch (IllegalAccessException e) {
- throw new ProcessException("Could not set
systemProperties on EventData", e);
- }
+ eventData.setSystemProperties(systemProperties);
}
receivedEvents.add(eventData);
}
return receivedEvents;
-
}
}
public static class MockGetAzureEventHubNoPartitions extends
GetAzureEventHub{
-
-
@Override
- protected void setupReceiver(final String connectionString) throws
ProcessException{
+ protected void setupReceiver(final String connectionString, final
ScheduledExecutorService executor) throws ProcessException{
//do nothing
}
@@ -215,10 +204,10 @@ public class GetAzureEventHubTest {
public void onScheduled(final ProcessContext context) throws
ProcessException {
}
+ @Override
+ public void tearDown() throws ProcessException {
+ }
}
-
-
-
private void setUpStandardTestConfig() {
testRunner.setProperty(GetAzureEventHub.EVENT_HUB_NAME,eventHubName);
testRunner.setProperty(GetAzureEventHub.NAMESPACE,namespaceName);
@@ -227,5 +216,4 @@ public class GetAzureEventHubTest {
testRunner.setProperty(GetAzureEventHub.NUM_PARTITIONS,"4");
testRunner.assertValid();
}
-
}
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java
index 1129ac7..ab556c5 100644
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java
@@ -23,6 +23,8 @@ import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Test;
+import java.util.concurrent.ScheduledExecutorService;
+
public class PutAzureEventHubTest {
private static final String namespaceName = "nifi-azure-hub";
private static final String eventHubName = "get-test";
@@ -51,13 +53,10 @@ public class PutAzureEventHubTest {
}
@Test
public void verifyRelationships(){
-
assert(2 == processor.getRelationships().size());
-
}
@Test
public void testNoFlow() {
-
setUpStandardTestConfig();
testRunner.run(1, true);
testRunner.assertAllFlowFilesTransferred(PutAzureEventHub.REL_SUCCESS,
0);
@@ -65,7 +64,6 @@ public class PutAzureEventHubTest {
}
@Test
public void testNormalFlow(){
-
setUpStandardTestConfig();
String flowFileContents = "TEST MESSAGE";
testRunner.enqueue(flowFileContents);
@@ -76,7 +74,6 @@ public class PutAzureEventHubTest {
}
@Test
public void testSendMessageThrows() {
-
PutAzureEventHubTest.OnSendThrowingMockPutAzureEventHub
throwingProcessor = new
PutAzureEventHubTest.OnSendThrowingMockPutAzureEventHub();
testRunner = TestRunners.newTestRunner(throwingProcessor);
setUpStandardTestConfig();
@@ -89,7 +86,6 @@ public class PutAzureEventHubTest {
@Test(expected = AssertionError.class)
public void testBadConnectionString() {
-
PutAzureEventHubTest.BogusConnectionStringMockPutAzureEventHub
badConnectionStringProcessor = new
PutAzureEventHubTest.BogusConnectionStringMockPutAzureEventHub();
testRunner = TestRunners.newTestRunner(badConnectionStringProcessor);
setUpStandardTestConfig();
@@ -97,14 +93,18 @@ public class PutAzureEventHubTest {
}
private static class MockPutAzureEventHub extends PutAzureEventHub{
-
byte[] receivedBuffer = null;
byte[] getReceivedBuffer(){
return receivedBuffer;
}
@Override
- protected EventHubClient createEventHubClient(final String namespace,
final String eventHubName, final String policyName, final String policyKey)
throws ProcessException {
+ protected EventHubClient createEventHubClient(
+ final String namespace,
+ final String eventHubName,
+ final String policyName,
+ final String policyKey,
+ final ScheduledExecutorService executor) throws ProcessException {
return null;
}
@@ -115,12 +115,16 @@ public class PutAzureEventHubTest {
}
private static class OnSendThrowingMockPutAzureEventHub extends
PutAzureEventHub{
@Override
- protected EventHubClient createEventHubClient(final String namespace,
final String eventHubName, final String policyName, final String policyKey)
throws ProcessException {
+ protected EventHubClient createEventHubClient(
+ final String namespace,
+ final String eventHubName,
+ final String policyName,
+ final String policyKey,
+ final ScheduledExecutorService executor) throws ProcessException {
return null;
}
}
private static class BogusConnectionStringMockPutAzureEventHub extends
PutAzureEventHub{
-
@Override
protected String getConnectionString(final String namespace, final
String eventHubName, final String policyName, final String policyKey){
return "Bogus Connection String";
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java
index 6f35093..e7edef7 100644
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java
@@ -66,7 +66,6 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestConsumeAzureEventHub {
-
private ConsumeAzureEventHub.EventProcessor eventProcessor;
private MockProcessSession processSession;
private SharedSessionState sharedState;
@@ -101,8 +100,7 @@ public class TestConsumeAzureEventHub {
@Test
public void testReceiveOne() throws Exception {
-
- final Iterable<EventData> eventDataList = Arrays.asList(new
EventData("one".getBytes(StandardCharsets.UTF_8)));
+ final Iterable<EventData> eventDataList =
Arrays.asList(EventData.create("one".getBytes(StandardCharsets.UTF_8)));
eventProcessor.onEvents(partitionContext, eventDataList);
processSession.assertCommitted();
@@ -121,13 +119,11 @@ public class TestConsumeAzureEventHub {
"eventhub-name/ConsumerGroups/consumer-group/Partitions/partition-id",
provenanceEvent1.getTransitUri());
}
-
@Test
public void testReceiveTwo() throws Exception {
-
final Iterable<EventData> eventDataList = Arrays.asList(
- new EventData("one".getBytes(StandardCharsets.UTF_8)),
- new EventData("two".getBytes(StandardCharsets.UTF_8))
+ EventData.create("one".getBytes(StandardCharsets.UTF_8)),
+ EventData.create("two".getBytes(StandardCharsets.UTF_8))
);
eventProcessor.onEvents(partitionContext, eventDataList);
@@ -145,10 +141,9 @@ public class TestConsumeAzureEventHub {
@Test
public void testCheckpointFailure() throws Exception {
-
final Iterable<EventData> eventDataList = Arrays.asList(
- new EventData("one".getBytes(StandardCharsets.UTF_8)),
- new EventData("two".getBytes(StandardCharsets.UTF_8))
+ EventData.create("one".getBytes(StandardCharsets.UTF_8)),
+ EventData.create("two".getBytes(StandardCharsets.UTF_8))
);
doThrow(new RuntimeException("Failed to create a
checkpoint.")).when(partitionContext).checkpoint();
eventProcessor.onEvents(partitionContext, eventDataList);
@@ -239,15 +234,13 @@ public class TestConsumeAzureEventHub {
.thenThrow(new MalformedRecordException("Simulating
Record parse failure."))
.thenReturn(records2[0], Arrays.copyOfRange(records2,
1, records2.length));
}
-
}
@Test
public void testReceiveRecords() throws Exception {
-
final List<EventData> eventDataList = Arrays.asList(
- new EventData("one".getBytes(StandardCharsets.UTF_8)),
- new EventData("two".getBytes(StandardCharsets.UTF_8))
+ EventData.create("one".getBytes(StandardCharsets.UTF_8)),
+ EventData.create("two".getBytes(StandardCharsets.UTF_8))
);
setupRecordReader(eventDataList);
@@ -274,12 +267,11 @@ public class TestConsumeAzureEventHub {
@Test
public void testReceiveRecordReaderFailure() throws Exception {
-
final List<EventData> eventDataList = Arrays.asList(
- new EventData("one".getBytes(StandardCharsets.UTF_8)),
- new EventData("two".getBytes(StandardCharsets.UTF_8)),
- new EventData("three".getBytes(StandardCharsets.UTF_8)),
- new EventData("four".getBytes(StandardCharsets.UTF_8))
+ EventData.create("one".getBytes(StandardCharsets.UTF_8)),
+ EventData.create("two".getBytes(StandardCharsets.UTF_8)),
+ EventData.create("three".getBytes(StandardCharsets.UTF_8)),
+ EventData.create("four".getBytes(StandardCharsets.UTF_8))
);
setupRecordReader(eventDataList, 2, null);
@@ -319,9 +311,8 @@ public class TestConsumeAzureEventHub {
@Test
public void testReceiveAllRecordFailure() throws Exception {
-
final List<EventData> eventDataList = Collections.singletonList(
- new EventData("one".getBytes(StandardCharsets.UTF_8))
+ EventData.create("one".getBytes(StandardCharsets.UTF_8))
);
setupRecordReader(eventDataList, 0, null);
@@ -348,17 +339,15 @@ public class TestConsumeAzureEventHub {
assertEquals(ProvenanceEventType.RECEIVE,
provenanceEvent1.getEventType());
assertEquals("amqps://namespace.servicebus.windows.net/" +
"eventhub-name/ConsumerGroups/consumer-group/Partitions/partition-id",
provenanceEvent1.getTransitUri());
-
}
@Test
public void testReceiveRecordWriterFailure() throws Exception {
-
final List<EventData> eventDataList = Arrays.asList(
- new EventData("one".getBytes(StandardCharsets.UTF_8)),
- new EventData("two".getBytes(StandardCharsets.UTF_8)),
- new EventData("three".getBytes(StandardCharsets.UTF_8)),
- new EventData("four".getBytes(StandardCharsets.UTF_8))
+ EventData.create("one".getBytes(StandardCharsets.UTF_8)),
+ EventData.create("two".getBytes(StandardCharsets.UTF_8)),
+ EventData.create("three".getBytes(StandardCharsets.UTF_8)),
+ EventData.create("four".getBytes(StandardCharsets.UTF_8))
);
setupRecordReader(eventDataList, -1, "two");
@@ -395,5 +384,4 @@ public class TestConsumeAzureEventHub {
assertEquals("amqps://namespace.servicebus.windows.net/" +
"eventhub-name/ConsumerGroups/consumer-group/Partitions/partition-id",
provenanceEvent2.getTransitUri());
}
-
}