This is an automated email from the ASF dual-hosted git repository.
jfrazee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 39a08025c3 NIFI-8677 Added endpoint suffix for Azure EventHub
Processors
39a08025c3 is described below
commit 39a08025c3a3d2556ae11c2bf2fcda5154cde470
Author: Timea Barna <[email protected]>
AuthorDate: Wed Aug 11 09:29:56 2021 +0200
NIFI-8677 Added endpoint suffix for Azure EventHub Processors
This closes #5303
Signed-off-by: Joey Frazee <[email protected]>
---
.../azure/eventhub/ConsumeAzureEventHub.java | 29 +++++++++++++-------
.../azure/eventhub/GetAzureEventHub.java | 20 ++++++--------
.../azure/eventhub/PutAzureEventHub.java | 20 +++++++++-----
.../azure/eventhub/utils/AzureEventHubUtils.java | 31 +++++++++++++++++++---
.../azure/eventhub/PutAzureEventHubTest.java | 5 +++-
.../azure/eventhub/TestConsumeAzureEventHub.java | 31 ++++++++++------------
6 files changed, 87 insertions(+), 49 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java
index 2b0f2e3f43..92d34ddef9 100644
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java
@@ -93,8 +93,8 @@ import static org.apache.nifi.util.StringUtils.isEmpty;
public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
private static final Pattern SAS_TOKEN_PATTERN =
Pattern.compile("^\\?.*$");
- private static final String
FORMAT_STORAGE_CONNECTION_STRING_FOR_ACCOUNT_KEY =
"DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s";
- private static final String FORMAT_STORAGE_CONNECTION_STRING_FOR_SAS_TOKEN
= "BlobEndpoint=https://%s.blob.core.windows.net/;SharedAccessSignature=%s";
+ private static final String
FORMAT_STORAGE_CONNECTION_STRING_FOR_ACCOUNT_KEY =
"DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s;EndpointSuffix=core.%s";
+ private static final String FORMAT_STORAGE_CONNECTION_STRING_FOR_SAS_TOKEN
= "BlobEndpoint=https://%s.blob.core.%s/;SharedAccessSignature=%s";
static final PropertyDescriptor NAMESPACE = new
PropertyDescriptor.Builder()
.name("event-hub-namespace")
@@ -112,7 +112,7 @@ public class ConsumeAzureEventHub extends
AbstractSessionFactoryProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.required(true)
.build();
- // TODO: Do we need to support custom service endpoints as
GetAzureEventHub does? Is it possible?
+ static final PropertyDescriptor SERVICE_BUS_ENDPOINT =
AzureEventHubUtils.SERVICE_BUS_ENDPOINT;
static final PropertyDescriptor ACCESS_POLICY_NAME = new
PropertyDescriptor.Builder()
.name("event-hub-shared-access-policy-name")
.displayName("Shared Access Policy Name")
@@ -271,7 +271,7 @@ public class ConsumeAzureEventHub extends
AbstractSessionFactoryProcessor {
static {
PROPERTIES = Collections.unmodifiableList(Arrays.asList(
- NAMESPACE, EVENT_HUB_NAME, ACCESS_POLICY_NAME,
POLICY_PRIMARY_KEY, USE_MANAGED_IDENTITY, CONSUMER_GROUP, CONSUMER_HOSTNAME,
+ NAMESPACE, EVENT_HUB_NAME, SERVICE_BUS_ENDPOINT,
ACCESS_POLICY_NAME, POLICY_PRIMARY_KEY, USE_MANAGED_IDENTITY, CONSUMER_GROUP,
CONSUMER_HOSTNAME,
RECORD_READER, RECORD_WRITER,
INITIAL_OFFSET, PREFETCH_COUNT, BATCH_SIZE, RECEIVE_TIMEOUT,
STORAGE_ACCOUNT_NAME, STORAGE_ACCOUNT_KEY, STORAGE_SAS_TOKEN,
STORAGE_CONTAINER_NAME
@@ -293,6 +293,7 @@ public class ConsumeAzureEventHub extends
AbstractSessionFactoryProcessor {
private volatile String namespaceName;
private volatile boolean isRecordReaderSet = false;
private volatile boolean isRecordWriterSet = false;
+ private volatile String serviceBusEndpoint;
/**
* For unit test.
@@ -322,6 +323,13 @@ public class ConsumeAzureEventHub extends
AbstractSessionFactoryProcessor {
this.writerFactory = writerFactory;
}
+ /**
+ * For unit test.
+ */
+ public void setServiceBusEndpoint(String serviceBusEndpoint) {
+ this.serviceBusEndpoint = serviceBusEndpoint;
+ }
+
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
@@ -464,7 +472,8 @@ public class ConsumeAzureEventHub extends
AbstractSessionFactoryProcessor {
private void transferTo(Relationship relationship, ProcessSession
session, StopWatch stopWatch,
String eventHubName, String partitionId,
String consumerGroup, FlowFile flowFile) {
session.transfer(flowFile, relationship);
- final String transitUri = "amqps://" + namespaceName +
".servicebus.windows.net/" + eventHubName + "/ConsumerGroups/" + consumerGroup
+ "/Partitions/" + partitionId;
+ final String transitUri =
String.format("amqps://%s%s/%s/ConsumerGroups/%s/Partitions/%s",
+ namespaceName, serviceBusEndpoint, eventHubName,
consumerGroup, partitionId);
session.getProvenanceReporter().receive(flowFile, transitUri,
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
}
@@ -652,13 +661,13 @@ public class ConsumeAzureEventHub extends
AbstractSessionFactoryProcessor {
final String connectionString;
final boolean useManagedIdentity =
context.getProperty(USE_MANAGED_IDENTITY).asBoolean();
if(useManagedIdentity) {
- connectionString =
AzureEventHubUtils.getManagedIdentityConnectionString(namespaceName,
eventHubName);
+ connectionString =
AzureEventHubUtils.getManagedIdentityConnectionString(namespaceName,
serviceBusEndpoint, eventHubName);
} else {
final String sasName =
context.getProperty(ACCESS_POLICY_NAME).evaluateAttributeExpressions().getValue();
validateRequiredProperty(ACCESS_POLICY_NAME, sasName);
final String sasKey =
context.getProperty(POLICY_PRIMARY_KEY).evaluateAttributeExpressions().getValue();
validateRequiredProperty(POLICY_PRIMARY_KEY, sasKey);
- connectionString =
AzureEventHubUtils.getSharedAccessSignatureConnectionString(namespaceName,
eventHubName, sasName, sasKey);
+ connectionString =
AzureEventHubUtils.getSharedAccessSignatureConnectionString(namespaceName,
serviceBusEndpoint, eventHubName, sasName, sasKey);
}
eventProcessorHost = EventProcessorHost.EventProcessorHostBuilder
@@ -680,13 +689,15 @@ public class ConsumeAzureEventHub extends
AbstractSessionFactoryProcessor {
final String storageAccountName =
context.getProperty(STORAGE_ACCOUNT_NAME).evaluateAttributeExpressions().getValue();
validateRequiredProperty(STORAGE_ACCOUNT_NAME, storageAccountName);
+ serviceBusEndpoint =
context.getProperty(SERVICE_BUS_ENDPOINT).getValue();
+ final String domainName = serviceBusEndpoint.replace(".servicebus.",
"");
final String storageAccountKey =
context.getProperty(STORAGE_ACCOUNT_KEY).evaluateAttributeExpressions().getValue();
final String storageSasToken =
context.getProperty(STORAGE_SAS_TOKEN).evaluateAttributeExpressions().getValue();
if (storageAccountKey != null) {
- return
String.format(FORMAT_STORAGE_CONNECTION_STRING_FOR_ACCOUNT_KEY,
storageAccountName, storageAccountKey);
+ return
String.format(FORMAT_STORAGE_CONNECTION_STRING_FOR_ACCOUNT_KEY,
storageAccountName, storageAccountKey, domainName);
}
- return String.format(FORMAT_STORAGE_CONNECTION_STRING_FOR_SAS_TOKEN,
storageAccountName, storageSasToken);
+ return String.format(FORMAT_STORAGE_CONNECTION_STRING_FOR_SAS_TOKEN,
storageAccountName, domainName, storageSasToken);
}
private String orDefault(String value, String defaultValue) {
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
index f40f668e92..27c836ce3e 100644
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
@@ -81,6 +81,9 @@ import
org.apache.nifi.processors.azure.eventhub.utils.AzureEventHubUtils;
@WritesAttribute(attribute = "eventhub.property.*", description = "The
application properties of this message. IE: 'application' would be
'eventhub.property.application'")
})
public class GetAzureEventHub extends AbstractProcessor {
+ private static final String TRANSIT_URI_FORMAT_STRING =
"amqps://%s%s/%s/ConsumerGroups/%s/Partitions/%s";
+ private static final String FORMAT_STRING_FOR_CONECTION_BUILDER =
"amqps://%s%s";
+
static final PropertyDescriptor EVENT_HUB_NAME = new
PropertyDescriptor.Builder()
.name("Event Hub Name")
.description("The name of the event hub to pull messages from")
@@ -94,15 +97,7 @@ public class GetAzureEventHub extends AbstractProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.required(true)
.build();
- static final PropertyDescriptor SERVICE_BUS_ENDPOINT = new
PropertyDescriptor.Builder()
- .name("Service Bus Endpoint")
- .description("To support namespaces in non-standard Host URIs (
not .servicebus.windows.net, ie .servicebus.chinacloudapi.cn) select from the
drop down acceptable options ")
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.NONE)
-
.allowableValues(".servicebus.windows.net",".servicebus.chinacloudapi.cn")
- .defaultValue(".servicebus.windows.net")
- .required(true)
- .build();
+ static final PropertyDescriptor SERVICE_BUS_ENDPOINT
=AzureEventHubUtils.SERVICE_BUS_ENDPOINT;
static final PropertyDescriptor ACCESS_POLICY = new
PropertyDescriptor.Builder()
.name("Shared Access Policy Name")
.description("The name of the shared access policy. This policy
must have Listen claims.")
@@ -315,12 +310,12 @@ public class GetAzureEventHub extends AbstractProcessor {
final String connectionString;
if(useManagedIdentity){
- connectionString =
AzureEventHubUtils.getManagedIdentityConnectionString(namespace,eventHubName);
+ connectionString =
AzureEventHubUtils.getManagedIdentityConnectionString(namespace,
serviceBusEndpoint, eventHubName);
} else {
final String policyName =
context.getProperty(ACCESS_POLICY).getValue();
final String policyKey =
context.getProperty(POLICY_PRIMARY_KEY).getValue();
connectionString = new ConnectionStringBuilder()
- .setEndpoint(new
URI("amqps://"+namespace+serviceBusEndpoint))
+ .setEndpoint(new
URI(String.format(FORMAT_STRING_FOR_CONECTION_BUILDER, namespace,
serviceBusEndpoint)))
.setEventHubName(eventHubName)
.setSasKeyName(policyName)
.setSasKey(policyKey).toString();
@@ -394,7 +389,8 @@ public class GetAzureEventHub extends AbstractProcessor {
final String eventHubName =
context.getProperty(EVENT_HUB_NAME).getValue();
final String consumerGroup =
context.getProperty(CONSUMER_GROUP).getValue();
final String serviceBusEndPoint =
context.getProperty(SERVICE_BUS_ENDPOINT).getValue();
- final String transitUri = "amqps://" + namespace +
serviceBusEndPoint + "/" + eventHubName + "/ConsumerGroups/" + consumerGroup +
"/Partitions/" + partitionId;
+ final String transitUri =
String.format(TRANSIT_URI_FORMAT_STRING,
+ namespace, serviceBusEndPoint, eventHubName,
consumerGroup, partitionId);
session.getProvenanceReporter().receive(flowFile,
transitUri, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
}
}
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java
index 51bb5bf611..f588f76a59 100644
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java
@@ -76,6 +76,8 @@ import org.apache.nifi.util.StopWatch;
+ "Also please be aware that this processor creates a thread pool of 4
threads for Event Hub Client. They will be extra threads other than the
concurrent tasks scheduled for this processor.")
@SystemResourceConsideration(resource = SystemResource.MEMORY)
public class PutAzureEventHub extends AbstractProcessor {
+ private static final String TRANSIT_URI_FORMAT_STRING = "amqps://%s%s/%s";
+
static final PropertyDescriptor EVENT_HUB_NAME = new
PropertyDescriptor.Builder()
.name("Event Hub Name")
.description("The name of the event hub to send to")
@@ -89,6 +91,7 @@ public class PutAzureEventHub extends AbstractProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.required(true)
.build();
+ static final PropertyDescriptor SERVICE_BUS_ENDPOINT =
AzureEventHubUtils.SERVICE_BUS_ENDPOINT;
static final PropertyDescriptor ACCESS_POLICY = new
PropertyDescriptor.Builder()
.name("Shared Access Policy Name")
.description("The name of the shared access policy. This policy
must have Send claims.")
@@ -140,6 +143,7 @@ public class PutAzureEventHub extends AbstractProcessor {
List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
_propertyDescriptors.add(EVENT_HUB_NAME);
_propertyDescriptors.add(NAMESPACE);
+ _propertyDescriptors.add(SERVICE_BUS_ENDPOINT);
_propertyDescriptors.add(ACCESS_POLICY);
_propertyDescriptors.add(POLICY_PRIMARY_KEY);
_propertyDescriptors.add(USE_MANAGED_IDENTITY);
@@ -241,7 +245,9 @@ public class PutAzureEventHub extends AbstractProcessor {
if(flowFileResult.getResult() == REL_SUCCESS) {
final String namespace =
context.getProperty(NAMESPACE).getValue();
final String eventHubName =
context.getProperty(EVENT_HUB_NAME).getValue();
- session.getProvenanceReporter().send(flowFile, "amqps://"
+ namespace + ".servicebus.windows.net" + "/" + eventHubName,
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+ final String serviceBusEndpoint =
context.getProperty(SERVICE_BUS_ENDPOINT).getValue();
+ final String transitUri =
String.format(TRANSIT_URI_FORMAT_STRING, namespace, serviceBusEndpoint,
eventHubName);
+ session.getProvenanceReporter().send(flowFile, transitUri,
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.transfer(flowFile, REL_SUCCESS);
} else {
@@ -330,9 +336,10 @@ public class PutAzureEventHub extends AbstractProcessor {
policyKey = context.getProperty(POLICY_PRIMARY_KEY).getValue();
}
final String namespace = context.getProperty(NAMESPACE).getValue();
+ final String serviceBusEndpoint =
context.getProperty(SERVICE_BUS_ENDPOINT).getValue();
final String eventHubName =
context.getProperty(EVENT_HUB_NAME).getValue();
for (int i = 0; i < numThreads; i++) {
- final EventHubClient client = createEventHubClient(namespace,
eventHubName, policyName, policyKey, executor);
+ final EventHubClient client = createEventHubClient(namespace,
serviceBusEndpoint, eventHubName, policyName, policyKey, executor);
if(null != client) {
senderQueue.offer(client);
}
@@ -351,6 +358,7 @@ public class PutAzureEventHub extends AbstractProcessor {
*/
protected EventHubClient createEventHubClient(
final String namespace,
+ final String serviceBusEndpoint,
final String eventHubName,
final String policyName,
final String policyKey,
@@ -361,9 +369,9 @@ public class PutAzureEventHub extends AbstractProcessor {
EventHubClientImpl.USER_AGENT = "ApacheNiFi-azureeventhub/3.1.1";
final String connectionString;
if(policyName == AzureEventHubUtils.MANAGED_IDENTITY_POLICY) {
- connectionString =
AzureEventHubUtils.getManagedIdentityConnectionString(namespace, eventHubName);
+ connectionString =
AzureEventHubUtils.getManagedIdentityConnectionString(namespace,
serviceBusEndpoint, eventHubName);
} else{
- connectionString = getConnectionString(namespace,
eventHubName, policyName, policyKey);
+ connectionString = getConnectionString(namespace,
serviceBusEndpoint, eventHubName, policyName, policyKey);
}
return
EventHubClient.createFromConnectionStringSync(connectionString, executor);
} catch (IOException | EventHubException |
IllegalConnectionStringFormatException e) {
@@ -372,8 +380,8 @@ public class PutAzureEventHub extends AbstractProcessor {
}
}
- protected String getConnectionString(final String namespace, final String
eventHubName, final String policyName, final String policyKey){
- return
AzureEventHubUtils.getSharedAccessSignatureConnectionString(namespace,
eventHubName, policyName, policyKey);
+ protected String getConnectionString(final String namespace, final String
serviceBusEndpoint, final String eventHubName, final String policyName, final
String policyKey){
+ return
AzureEventHubUtils.getSharedAccessSignatureConnectionString(namespace,
serviceBusEndpoint, eventHubName, policyName, policyKey);
}
/**
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/utils/AzureEventHubUtils.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/utils/AzureEventHubUtils.java
index f71f6c8495..cb19f2e54b 100644
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/utils/AzureEventHubUtils.java
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/utils/AzureEventHubUtils.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
import com.microsoft.azure.eventhubs.EventData;
+import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
@@ -33,6 +34,10 @@ import org.apache.nifi.processor.util.StandardValidators;
public final class AzureEventHubUtils {
public static final String MANAGED_IDENTITY_POLICY =
ConnectionStringBuilder.MANAGED_IDENTITY_AUTHENTICATION;
+ public static final AllowableValue AZURE_ENDPOINT = new
AllowableValue(".servicebus.windows.net","Azure", "Servicebus endpoint for
general use");
+ public static final AllowableValue AZURE_CHINA_ENDPOINT = new
AllowableValue(".servicebus.chinacloudapi.cn", "Azure China", "Servicebus
endpoint for China");
+ public static final AllowableValue AZURE_GERMANY_ENDPOINT = new
AllowableValue(".servicebus.cloudapi.de", "Azure Germany", "Servicebus endpoint
for Germany");
+ public static final AllowableValue AZURE_US_GOV_ENDPOINT = new
AllowableValue(".servicebus.usgovcloudapi.net", "Azure US Government",
"Servicebus endpoint for US Government");
public static final PropertyDescriptor POLICY_PRIMARY_KEY = new
PropertyDescriptor.Builder()
.name("Shared Access Policy Primary Key")
@@ -50,6 +55,17 @@ public final class AzureEventHubUtils {
.required(false).defaultValue("false").allowableValues("true", "false")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR).build();
+ public static final PropertyDescriptor SERVICE_BUS_ENDPOINT = new
PropertyDescriptor.Builder()
+ .name("Service Bus Endpoint")
+ .description("To support namespaces not in the default windows.net
domain.")
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .allowableValues(AzureEventHubUtils.AZURE_ENDPOINT,
AzureEventHubUtils.AZURE_CHINA_ENDPOINT,
+ AzureEventHubUtils.AZURE_GERMANY_ENDPOINT,
AzureEventHubUtils.AZURE_US_GOV_ENDPOINT)
+ .defaultValue(AzureEventHubUtils.AZURE_ENDPOINT.getValue())
+ .required(true)
+ .build();
+
public static List<ValidationResult> customValidate(PropertyDescriptor
accessPolicyDescriptor,
PropertyDescriptor policyKeyDescriptor,
ValidationContext context) {
@@ -79,13 +95,15 @@ public final class AzureEventHubUtils {
return retVal;
}
- public static String getManagedIdentityConnectionString(final String
namespace, final String eventHubName){
- return new
ConnectionStringBuilder().setNamespaceName(namespace).setEventHubName(eventHubName)
+ public static String getManagedIdentityConnectionString(final String
namespace, final String domainName, final String eventHubName){
+ return new ConnectionStringBuilder()
+ .setEndpoint(namespace, removeStartingDotFrom(domainName))
+ .setEventHubName(eventHubName)
.setAuthentication(MANAGED_IDENTITY_POLICY).toString();
}
- public static String getSharedAccessSignatureConnectionString(final String
namespace, final String eventHubName, final String sasName, final String
sasKey) {
+ public static String getSharedAccessSignatureConnectionString(final String
namespace, final String domainName, final String eventHubName, final String
sasName, final String sasKey) {
return new ConnectionStringBuilder()
- .setNamespaceName(namespace)
+ .setEndpoint(namespace, removeStartingDotFrom(domainName))
.setEventHubName(eventHubName)
.setSasKeyName(sasName)
.setSasKey(sasKey).toString();
@@ -103,4 +121,9 @@ public final class AzureEventHubUtils {
return properties;
}
+
+ private static String removeStartingDotFrom(final String domainName) {
+ return domainName.replaceFirst("^\\.", "");
+ }
+
}
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java
index d83aae4813..9f0bf5f0c8 100644
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java
@@ -328,6 +328,7 @@ public class PutAzureEventHubTest {
@Override
protected EventHubClient createEventHubClient(
final String namespace,
+ final String serviceBusEndpoint,
final String eventHubName,
final String policyName,
final String policyKey,
@@ -346,6 +347,7 @@ public class PutAzureEventHubTest {
@Override
protected EventHubClient createEventHubClient(
final String namespace,
+ final String serviceBusEndpoint,
final String eventHubName,
final String policyName,
final String policyKey,
@@ -355,7 +357,7 @@ public class PutAzureEventHubTest {
}
private static class BogusConnectionStringMockPutAzureEventHub extends
PutAzureEventHub{
@Override
- protected String getConnectionString(final String namespace, final
String eventHubName, final String policyName, final String policyKey){
+ protected String getConnectionString(final String namespace, final
String serviceBusEndpoint, final String eventHubName, final String policyName,
final String policyKey){
return "Bogus Connection String";
}
}
@@ -371,6 +373,7 @@ public class PutAzureEventHubTest {
@Override
protected EventHubClient createEventHubClient(
final String namespace,
+ final String serviceBusEndpoint,
final String eventHubName,
final String policyName,
final String policyKey,
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java
index 659e9a6f9a..85dead6e13 100644
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java
@@ -78,6 +78,10 @@ public class TestConsumeAzureEventHub {
private static final String storageAccountName = "test-sa";
private static final String storageAccountKey = "test-sa-key";
private static final String storageSasToken = "?test-sa-token";
+ private static final String serviceBusEndpoint = ".endpoint";
+
+ private static final String EXPECTED_TRANSIT_URI = "amqps://namespace" +
serviceBusEndpoint + "/" +
+
"eventhub-name/ConsumerGroups/consumer-group/Partitions/partition-id";
private ConsumeAzureEventHub.EventProcessor eventProcessor;
private MockProcessSession processSession;
@@ -98,6 +102,7 @@ public class TestConsumeAzureEventHub {
final ProcessSessionFactory processSessionFactory =
Mockito.mock(ProcessSessionFactory.class);
processor.setProcessSessionFactory(processSessionFactory);
processor.setNamespaceName("namespace");
+ processor.setServiceBusEndpoint(serviceBusEndpoint);
sharedState = new SharedSessionState(processor, new AtomicLong(0));
processSession = new MockProcessSession(sharedState, processor);
@@ -130,7 +135,6 @@ public class TestConsumeAzureEventHub {
testRunner.setProperty(ConsumeAzureEventHub.STORAGE_ACCOUNT_NAME,
storageAccountName);
testRunner.setProperty(ConsumeAzureEventHub.STORAGE_ACCOUNT_KEY,
storageAccountKey);
testRunner.assertNotValid();
-
testRunner.setProperty(ConsumeAzureEventHub.USE_MANAGED_IDENTITY,"true");
testRunner.assertValid();
}
@@ -228,8 +232,7 @@ public class TestConsumeAzureEventHub {
assertEquals(1, provenanceEvents.size());
final ProvenanceEventRecord provenanceEvent1 = provenanceEvents.get(0);
assertEquals(ProvenanceEventType.RECEIVE,
provenanceEvent1.getEventType());
- assertEquals("amqps://namespace.servicebus.windows.net/" +
-
"eventhub-name/ConsumerGroups/consumer-group/Partitions/partition-id",
provenanceEvent1.getTransitUri());
+ assertEquals(EXPECTED_TRANSIT_URI, provenanceEvent1.getTransitUri());
}
@Test
@@ -340,8 +343,8 @@ public class TestConsumeAzureEventHub {
default:
final List<Record> recordList1 =
addEndRecord.apply(recordList.subList(0, throwExceptionAt));
final List<Record> recordList2 =
addEndRecord.apply(recordList.subList(throwExceptionAt + 1, recordList.size()));
- final Record[] records1 = recordList1.toArray(new Record[0]);
- final Record[] records2 = recordList2.toArray(new Record[0]);
+ final Record[] records1 = recordList1.toArray(new
Record[recordList1.size()]);
+ final Record[] records2 = recordList2.toArray(new
Record[recordList2.size()]);
when(reader.nextRecord())
.thenReturn(records1[0], Arrays.copyOfRange(records1,
1, records1.length))
.thenThrow(new MalformedRecordException("Simulating
Record parse failure."))
@@ -374,8 +377,7 @@ public class TestConsumeAzureEventHub {
assertEquals(1, provenanceEvents.size());
final ProvenanceEventRecord provenanceEvent1 = provenanceEvents.get(0);
assertEquals(ProvenanceEventType.RECEIVE,
provenanceEvent1.getEventType());
- assertEquals("amqps://namespace.servicebus.windows.net/" +
-
"eventhub-name/ConsumerGroups/consumer-group/Partitions/partition-id",
provenanceEvent1.getTransitUri());
+ assertEquals(EXPECTED_TRANSIT_URI, provenanceEvent1.getTransitUri());
}
@Test
@@ -413,13 +415,11 @@ public class TestConsumeAzureEventHub {
final ProvenanceEventRecord provenanceEvent1 = provenanceEvents.get(0);
assertEquals(ProvenanceEventType.RECEIVE,
provenanceEvent1.getEventType());
- assertEquals("amqps://namespace.servicebus.windows.net/" +
-
"eventhub-name/ConsumerGroups/consumer-group/Partitions/partition-id",
provenanceEvent1.getTransitUri());
+ assertEquals(EXPECTED_TRANSIT_URI, provenanceEvent1.getTransitUri());
final ProvenanceEventRecord provenanceEvent2 = provenanceEvents.get(1);
assertEquals(ProvenanceEventType.RECEIVE,
provenanceEvent2.getEventType());
- assertEquals("amqps://namespace.servicebus.windows.net/" +
-
"eventhub-name/ConsumerGroups/consumer-group/Partitions/partition-id",
provenanceEvent2.getTransitUri());
+ assertEquals(EXPECTED_TRANSIT_URI, provenanceEvent2.getTransitUri());
}
@Test
@@ -450,8 +450,7 @@ public class TestConsumeAzureEventHub {
final ProvenanceEventRecord provenanceEvent1 = provenanceEvents.get(0);
assertEquals(ProvenanceEventType.RECEIVE,
provenanceEvent1.getEventType());
- assertEquals("amqps://namespace.servicebus.windows.net/" +
-
"eventhub-name/ConsumerGroups/consumer-group/Partitions/partition-id",
provenanceEvent1.getTransitUri());
+ assertEquals(EXPECTED_TRANSIT_URI, provenanceEvent1.getTransitUri());
}
@Test
@@ -489,12 +488,10 @@ public class TestConsumeAzureEventHub {
final ProvenanceEventRecord provenanceEvent1 = provenanceEvents.get(0);
assertEquals(ProvenanceEventType.RECEIVE,
provenanceEvent1.getEventType());
- assertEquals("amqps://namespace.servicebus.windows.net/" +
-
"eventhub-name/ConsumerGroups/consumer-group/Partitions/partition-id",
provenanceEvent1.getTransitUri());
+ assertEquals(EXPECTED_TRANSIT_URI, provenanceEvent1.getTransitUri());
final ProvenanceEventRecord provenanceEvent2 = provenanceEvents.get(1);
assertEquals(ProvenanceEventType.RECEIVE,
provenanceEvent2.getEventType());
- assertEquals("amqps://namespace.servicebus.windows.net/" +
-
"eventhub-name/ConsumerGroups/consumer-group/Partitions/partition-id",
provenanceEvent2.getTransitUri());
+ assertEquals(EXPECTED_TRANSIT_URI, provenanceEvent2.getTransitUri());
}
}