This is an automated email from the ASF dual-hosted git repository.

jfrazee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 39a08025c3 NIFI-8677 Added endpoint suffix for Azure EventHub 
Processors
39a08025c3 is described below

commit 39a08025c3a3d2556ae11c2bf2fcda5154cde470
Author: Timea Barna <[email protected]>
AuthorDate: Wed Aug 11 09:29:56 2021 +0200

    NIFI-8677 Added endpoint suffix for Azure EventHub Processors
    
    This closes #5303
    
    Signed-off-by: Joey Frazee <[email protected]>
---
 .../azure/eventhub/ConsumeAzureEventHub.java       | 29 +++++++++++++-------
 .../azure/eventhub/GetAzureEventHub.java           | 20 ++++++--------
 .../azure/eventhub/PutAzureEventHub.java           | 20 +++++++++-----
 .../azure/eventhub/utils/AzureEventHubUtils.java   | 31 +++++++++++++++++++---
 .../azure/eventhub/PutAzureEventHubTest.java       |  5 +++-
 .../azure/eventhub/TestConsumeAzureEventHub.java   | 31 ++++++++++------------
 6 files changed, 87 insertions(+), 49 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java
index 2b0f2e3f43..92d34ddef9 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java
@@ -93,8 +93,8 @@ import static org.apache.nifi.util.StringUtils.isEmpty;
 public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
 
     private static final Pattern SAS_TOKEN_PATTERN = 
Pattern.compile("^\\?.*$");
-    private static final String 
FORMAT_STORAGE_CONNECTION_STRING_FOR_ACCOUNT_KEY = 
"DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s";
-    private static final String FORMAT_STORAGE_CONNECTION_STRING_FOR_SAS_TOKEN 
= "BlobEndpoint=https://%s.blob.core.windows.net/;SharedAccessSignature=%s";;
+    private static final String 
FORMAT_STORAGE_CONNECTION_STRING_FOR_ACCOUNT_KEY = 
"DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s;EndpointSuffix=core.%s";
+    private static final String FORMAT_STORAGE_CONNECTION_STRING_FOR_SAS_TOKEN 
= "BlobEndpoint=https://%s.blob.core.%s/;SharedAccessSignature=%s";;
 
     static final PropertyDescriptor NAMESPACE = new 
PropertyDescriptor.Builder()
             .name("event-hub-namespace")
@@ -112,7 +112,7 @@ public class ConsumeAzureEventHub extends 
AbstractSessionFactoryProcessor {
             
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .required(true)
             .build();
-    // TODO: Do we need to support custom service endpoints as 
GetAzureEventHub does? Is it possible?
+    static final PropertyDescriptor SERVICE_BUS_ENDPOINT = 
AzureEventHubUtils.SERVICE_BUS_ENDPOINT;
     static final PropertyDescriptor ACCESS_POLICY_NAME = new 
PropertyDescriptor.Builder()
             .name("event-hub-shared-access-policy-name")
             .displayName("Shared Access Policy Name")
@@ -271,7 +271,7 @@ public class ConsumeAzureEventHub extends 
AbstractSessionFactoryProcessor {
 
     static {
         PROPERTIES = Collections.unmodifiableList(Arrays.asList(
-                NAMESPACE, EVENT_HUB_NAME, ACCESS_POLICY_NAME, 
POLICY_PRIMARY_KEY, USE_MANAGED_IDENTITY, CONSUMER_GROUP, CONSUMER_HOSTNAME,
+                NAMESPACE, EVENT_HUB_NAME, SERVICE_BUS_ENDPOINT, 
ACCESS_POLICY_NAME, POLICY_PRIMARY_KEY, USE_MANAGED_IDENTITY, CONSUMER_GROUP, 
CONSUMER_HOSTNAME,
                 RECORD_READER, RECORD_WRITER,
                 INITIAL_OFFSET, PREFETCH_COUNT, BATCH_SIZE, RECEIVE_TIMEOUT,
                 STORAGE_ACCOUNT_NAME, STORAGE_ACCOUNT_KEY, STORAGE_SAS_TOKEN, 
STORAGE_CONTAINER_NAME
@@ -293,6 +293,7 @@ public class ConsumeAzureEventHub extends 
AbstractSessionFactoryProcessor {
     private volatile String namespaceName;
     private volatile boolean isRecordReaderSet = false;
     private volatile boolean isRecordWriterSet = false;
+    private volatile String serviceBusEndpoint;
 
     /**
      * For unit test.
@@ -322,6 +323,13 @@ public class ConsumeAzureEventHub extends 
AbstractSessionFactoryProcessor {
         this.writerFactory = writerFactory;
     }
 
+    /**
+     * For unit test.
+     */
+    public void setServiceBusEndpoint(String serviceBusEndpoint) {
+        this.serviceBusEndpoint = serviceBusEndpoint;
+    }
+
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         return PROPERTIES;
@@ -464,7 +472,8 @@ public class ConsumeAzureEventHub extends 
AbstractSessionFactoryProcessor {
         private void transferTo(Relationship relationship, ProcessSession 
session, StopWatch stopWatch,
                                 String eventHubName, String partitionId, 
String consumerGroup, FlowFile flowFile) {
             session.transfer(flowFile, relationship);
-            final String transitUri = "amqps://" + namespaceName + 
".servicebus.windows.net/" + eventHubName + "/ConsumerGroups/" + consumerGroup 
+ "/Partitions/" + partitionId;
+            final String transitUri = 
String.format("amqps://%s%s/%s/ConsumerGroups/%s/Partitions/%s",
+                    namespaceName, serviceBusEndpoint, eventHubName, 
consumerGroup, partitionId);
             session.getProvenanceReporter().receive(flowFile, transitUri, 
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
         }
 
@@ -652,13 +661,13 @@ public class ConsumeAzureEventHub extends 
AbstractSessionFactoryProcessor {
         final String connectionString;
         final boolean useManagedIdentity = 
context.getProperty(USE_MANAGED_IDENTITY).asBoolean();
         if(useManagedIdentity) {
-            connectionString = 
AzureEventHubUtils.getManagedIdentityConnectionString(namespaceName, 
eventHubName);
+            connectionString = 
AzureEventHubUtils.getManagedIdentityConnectionString(namespaceName, 
serviceBusEndpoint, eventHubName);
         } else {
             final String sasName = 
context.getProperty(ACCESS_POLICY_NAME).evaluateAttributeExpressions().getValue();
             validateRequiredProperty(ACCESS_POLICY_NAME, sasName);
             final String sasKey = 
context.getProperty(POLICY_PRIMARY_KEY).evaluateAttributeExpressions().getValue();
             validateRequiredProperty(POLICY_PRIMARY_KEY, sasKey);
-            connectionString = 
AzureEventHubUtils.getSharedAccessSignatureConnectionString(namespaceName, 
eventHubName, sasName, sasKey);
+            connectionString = 
AzureEventHubUtils.getSharedAccessSignatureConnectionString(namespaceName, 
serviceBusEndpoint, eventHubName, sasName, sasKey);
         }
 
         eventProcessorHost = EventProcessorHost.EventProcessorHostBuilder
@@ -680,13 +689,15 @@ public class ConsumeAzureEventHub extends 
AbstractSessionFactoryProcessor {
         final String storageAccountName = 
context.getProperty(STORAGE_ACCOUNT_NAME).evaluateAttributeExpressions().getValue();
         validateRequiredProperty(STORAGE_ACCOUNT_NAME, storageAccountName);
 
+        serviceBusEndpoint = 
context.getProperty(SERVICE_BUS_ENDPOINT).getValue();
+        final String domainName = serviceBusEndpoint.replace(".servicebus.", 
"");
         final String storageAccountKey = 
context.getProperty(STORAGE_ACCOUNT_KEY).evaluateAttributeExpressions().getValue();
         final String storageSasToken = 
context.getProperty(STORAGE_SAS_TOKEN).evaluateAttributeExpressions().getValue();
 
         if (storageAccountKey != null) {
-            return 
String.format(FORMAT_STORAGE_CONNECTION_STRING_FOR_ACCOUNT_KEY, 
storageAccountName, storageAccountKey);
+            return 
String.format(FORMAT_STORAGE_CONNECTION_STRING_FOR_ACCOUNT_KEY, 
storageAccountName, storageAccountKey, domainName);
         }
-        return String.format(FORMAT_STORAGE_CONNECTION_STRING_FOR_SAS_TOKEN, 
storageAccountName, storageSasToken);
+        return String.format(FORMAT_STORAGE_CONNECTION_STRING_FOR_SAS_TOKEN, 
storageAccountName, domainName, storageSasToken);
     }
 
     private String orDefault(String value, String defaultValue) {
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
index f40f668e92..27c836ce3e 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
@@ -81,6 +81,9 @@ import 
org.apache.nifi.processors.azure.eventhub.utils.AzureEventHubUtils;
         @WritesAttribute(attribute = "eventhub.property.*", description = "The 
application properties of this message. IE: 'application' would be 
'eventhub.property.application'")
 })
 public class GetAzureEventHub extends AbstractProcessor {
+    private static final String TRANSIT_URI_FORMAT_STRING = 
"amqps://%s%s/%s/ConsumerGroups/%s/Partitions/%s";
+    private static final String FORMAT_STRING_FOR_CONECTION_BUILDER = 
"amqps://%s%s";
+
     static final PropertyDescriptor EVENT_HUB_NAME = new 
PropertyDescriptor.Builder()
             .name("Event Hub Name")
             .description("The name of the event hub to pull messages from")
@@ -94,15 +97,7 @@ public class GetAzureEventHub extends AbstractProcessor {
             .expressionLanguageSupported(ExpressionLanguageScope.NONE)
             .required(true)
             .build();
-    static final PropertyDescriptor SERVICE_BUS_ENDPOINT = new 
PropertyDescriptor.Builder()
-            .name("Service Bus Endpoint")
-            .description("To support namespaces in non-standard Host URIs ( 
not .servicebus.windows.net,  ie .servicebus.chinacloudapi.cn) select from the 
drop down acceptable options ")
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
-            
.allowableValues(".servicebus.windows.net",".servicebus.chinacloudapi.cn")
-            .defaultValue(".servicebus.windows.net")
-            .required(true)
-            .build();
+    static final PropertyDescriptor SERVICE_BUS_ENDPOINT 
=AzureEventHubUtils.SERVICE_BUS_ENDPOINT;
     static final PropertyDescriptor ACCESS_POLICY = new 
PropertyDescriptor.Builder()
             .name("Shared Access Policy Name")
             .description("The name of the shared access policy. This policy 
must have Listen claims.")
@@ -315,12 +310,12 @@ public class GetAzureEventHub extends AbstractProcessor {
         final String connectionString;
 
         if(useManagedIdentity){
-            connectionString = 
AzureEventHubUtils.getManagedIdentityConnectionString(namespace,eventHubName);
+            connectionString = 
AzureEventHubUtils.getManagedIdentityConnectionString(namespace, 
serviceBusEndpoint, eventHubName);
         } else {
             final String policyName = 
context.getProperty(ACCESS_POLICY).getValue();
             final String policyKey = 
context.getProperty(POLICY_PRIMARY_KEY).getValue();
             connectionString = new ConnectionStringBuilder()
-                                    .setEndpoint(new 
URI("amqps://"+namespace+serviceBusEndpoint))
+                                    .setEndpoint(new 
URI(String.format(FORMAT_STRING_FOR_CONECTION_BUILDER, namespace, 
serviceBusEndpoint)))
                                     .setEventHubName(eventHubName)
                                     .setSasKeyName(policyName)
                                     .setSasKey(policyKey).toString();
@@ -394,7 +389,8 @@ public class GetAzureEventHub extends AbstractProcessor {
                     final String eventHubName = 
context.getProperty(EVENT_HUB_NAME).getValue();
                     final String consumerGroup = 
context.getProperty(CONSUMER_GROUP).getValue();
                     final String serviceBusEndPoint = 
context.getProperty(SERVICE_BUS_ENDPOINT).getValue();
-                    final String transitUri = "amqps://" + namespace + 
serviceBusEndPoint + "/" + eventHubName + "/ConsumerGroups/" + consumerGroup + 
"/Partitions/" + partitionId;
+                    final String transitUri = 
String.format(TRANSIT_URI_FORMAT_STRING,
+                            namespace, serviceBusEndPoint, eventHubName, 
consumerGroup, partitionId);
                     session.getProvenanceReporter().receive(flowFile, 
transitUri, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
                 }
             }
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java
index 51bb5bf611..f588f76a59 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java
@@ -76,6 +76,8 @@ import org.apache.nifi.util.StopWatch;
         + "Also please be aware that this processor creates a thread pool of 4 
threads for Event Hub Client. They will be extra threads other than the 
concurrent tasks scheduled for this processor.")
 @SystemResourceConsideration(resource = SystemResource.MEMORY)
 public class PutAzureEventHub extends AbstractProcessor {
+    private static final String TRANSIT_URI_FORMAT_STRING = "amqps://%s%s/%s";
+
     static final PropertyDescriptor EVENT_HUB_NAME = new 
PropertyDescriptor.Builder()
             .name("Event Hub Name")
             .description("The name of the event hub to send to")
@@ -89,6 +91,7 @@ public class PutAzureEventHub extends AbstractProcessor {
             .expressionLanguageSupported(ExpressionLanguageScope.NONE)
             .required(true)
             .build();
+    static final PropertyDescriptor SERVICE_BUS_ENDPOINT = 
AzureEventHubUtils.SERVICE_BUS_ENDPOINT;
     static final PropertyDescriptor ACCESS_POLICY = new 
PropertyDescriptor.Builder()
             .name("Shared Access Policy Name")
             .description("The name of the shared access policy. This policy 
must have Send claims.")
@@ -140,6 +143,7 @@ public class PutAzureEventHub extends AbstractProcessor {
         List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
         _propertyDescriptors.add(EVENT_HUB_NAME);
         _propertyDescriptors.add(NAMESPACE);
+        _propertyDescriptors.add(SERVICE_BUS_ENDPOINT);
         _propertyDescriptors.add(ACCESS_POLICY);
         _propertyDescriptors.add(POLICY_PRIMARY_KEY);
         _propertyDescriptors.add(USE_MANAGED_IDENTITY);
@@ -241,7 +245,9 @@ public class PutAzureEventHub extends AbstractProcessor {
                 if(flowFileResult.getResult() == REL_SUCCESS) {
                     final String namespace = 
context.getProperty(NAMESPACE).getValue();
                     final String eventHubName = 
context.getProperty(EVENT_HUB_NAME).getValue();
-                    session.getProvenanceReporter().send(flowFile, "amqps://" 
+ namespace + ".servicebus.windows.net" + "/" + eventHubName, 
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+                    final String serviceBusEndpoint = 
context.getProperty(SERVICE_BUS_ENDPOINT).getValue();
+                    final String transitUri = 
String.format(TRANSIT_URI_FORMAT_STRING, namespace, serviceBusEndpoint, 
eventHubName);
+                    session.getProvenanceReporter().send(flowFile, transitUri, 
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
                     session.transfer(flowFile, REL_SUCCESS);
 
                 } else {
@@ -330,9 +336,10 @@ public class PutAzureEventHub extends AbstractProcessor {
                 policyKey = context.getProperty(POLICY_PRIMARY_KEY).getValue();
             }
             final String namespace = context.getProperty(NAMESPACE).getValue();
+            final String serviceBusEndpoint = 
context.getProperty(SERVICE_BUS_ENDPOINT).getValue();
             final String eventHubName = 
context.getProperty(EVENT_HUB_NAME).getValue();
             for (int i = 0; i < numThreads; i++) {
-                final EventHubClient client = createEventHubClient(namespace, 
eventHubName, policyName, policyKey, executor);
+                final EventHubClient client = createEventHubClient(namespace, 
serviceBusEndpoint, eventHubName, policyName, policyKey, executor);
                 if(null != client) {
                     senderQueue.offer(client);
                 }
@@ -351,6 +358,7 @@ public class PutAzureEventHub extends AbstractProcessor {
      */
     protected EventHubClient createEventHubClient(
         final String namespace,
+        final String serviceBusEndpoint,
         final String eventHubName,
         final String policyName,
         final String policyKey,
@@ -361,9 +369,9 @@ public class PutAzureEventHub extends AbstractProcessor {
             EventHubClientImpl.USER_AGENT = "ApacheNiFi-azureeventhub/3.1.1";
             final String connectionString;
             if(policyName == AzureEventHubUtils.MANAGED_IDENTITY_POLICY) {
-                connectionString = 
AzureEventHubUtils.getManagedIdentityConnectionString(namespace, eventHubName);
+                connectionString = 
AzureEventHubUtils.getManagedIdentityConnectionString(namespace, 
serviceBusEndpoint, eventHubName);
             } else{
-                connectionString = getConnectionString(namespace, 
eventHubName, policyName, policyKey);
+                connectionString = getConnectionString(namespace, 
serviceBusEndpoint, eventHubName, policyName, policyKey);
             }
             return 
EventHubClient.createFromConnectionStringSync(connectionString, executor);
         } catch (IOException | EventHubException | 
IllegalConnectionStringFormatException e) {
@@ -372,8 +380,8 @@ public class PutAzureEventHub extends AbstractProcessor {
         }
     }
 
-    protected String getConnectionString(final String namespace, final String 
eventHubName, final String policyName, final String policyKey){
-        return 
AzureEventHubUtils.getSharedAccessSignatureConnectionString(namespace, 
eventHubName, policyName, policyKey);
+    protected String getConnectionString(final String namespace, final String 
serviceBusEndpoint, final String eventHubName, final String policyName, final 
String policyKey){
+        return 
AzureEventHubUtils.getSharedAccessSignatureConnectionString(namespace, 
serviceBusEndpoint, eventHubName, policyName, policyKey);
     }
 
     /**
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/utils/AzureEventHubUtils.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/utils/AzureEventHubUtils.java
index f71f6c8495..cb19f2e54b 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/utils/AzureEventHubUtils.java
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/utils/AzureEventHubUtils.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
 import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
 import com.microsoft.azure.eventhubs.EventData;
 
+import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
@@ -33,6 +34,10 @@ import org.apache.nifi.processor.util.StandardValidators;
 public final class AzureEventHubUtils {
 
     public static final String MANAGED_IDENTITY_POLICY = 
ConnectionStringBuilder.MANAGED_IDENTITY_AUTHENTICATION;
+    public static final AllowableValue AZURE_ENDPOINT = new 
AllowableValue(".servicebus.windows.net","Azure", "Servicebus endpoint for 
general use");
+    public static final AllowableValue AZURE_CHINA_ENDPOINT = new 
AllowableValue(".servicebus.chinacloudapi.cn", "Azure China", "Servicebus 
endpoint for China");
+    public static final AllowableValue AZURE_GERMANY_ENDPOINT = new 
AllowableValue(".servicebus.cloudapi.de", "Azure Germany", "Servicebus endpoint 
for Germany");
+    public static final AllowableValue AZURE_US_GOV_ENDPOINT = new 
AllowableValue(".servicebus.usgovcloudapi.net", "Azure US Government", 
"Servicebus endpoint for US Government");
 
     public static final PropertyDescriptor POLICY_PRIMARY_KEY = new 
PropertyDescriptor.Builder()
         .name("Shared Access Policy Primary Key")
@@ -50,6 +55,17 @@ public final class AzureEventHubUtils {
         .required(false).defaultValue("false").allowableValues("true", "false")
         .addValidator(StandardValidators.BOOLEAN_VALIDATOR).build();
 
+    public static final PropertyDescriptor SERVICE_BUS_ENDPOINT = new 
PropertyDescriptor.Builder()
+            .name("Service Bus Endpoint")
+            .description("To support namespaces not in the default windows.net 
domain.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .allowableValues(AzureEventHubUtils.AZURE_ENDPOINT, 
AzureEventHubUtils.AZURE_CHINA_ENDPOINT,
+                    AzureEventHubUtils.AZURE_GERMANY_ENDPOINT, 
AzureEventHubUtils.AZURE_US_GOV_ENDPOINT)
+            .defaultValue(AzureEventHubUtils.AZURE_ENDPOINT.getValue())
+            .required(true)
+            .build();
+
     public static List<ValidationResult> customValidate(PropertyDescriptor 
accessPolicyDescriptor,
         PropertyDescriptor policyKeyDescriptor,
         ValidationContext context) {
@@ -79,13 +95,15 @@ public final class AzureEventHubUtils {
         return retVal;
     }
 
-    public static String getManagedIdentityConnectionString(final String 
namespace, final String eventHubName){
-        return new 
ConnectionStringBuilder().setNamespaceName(namespace).setEventHubName(eventHubName)
+    public static String getManagedIdentityConnectionString(final String 
namespace, final String domainName, final String eventHubName){
+        return new ConnectionStringBuilder()
+                    .setEndpoint(namespace, removeStartingDotFrom(domainName))
+                    .setEventHubName(eventHubName)
                     .setAuthentication(MANAGED_IDENTITY_POLICY).toString();
     }
-    public static String getSharedAccessSignatureConnectionString(final String 
namespace, final String eventHubName, final String sasName, final String 
sasKey) {
+    public static String getSharedAccessSignatureConnectionString(final String 
namespace, final String domainName, final String eventHubName, final String 
sasName, final String sasKey) {
         return new ConnectionStringBuilder()
-                    .setNamespaceName(namespace)
+                    .setEndpoint(namespace, removeStartingDotFrom(domainName))
                     .setEventHubName(eventHubName)
                     .setSasKeyName(sasName)
                     .setSasKey(sasKey).toString();
@@ -103,4 +121,9 @@ public final class AzureEventHubUtils {
 
         return properties;
     }
+
+    private static String removeStartingDotFrom(final String domainName) {
+        return domainName.replaceFirst("^\\.", "");
+    }
+
 }
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java
index d83aae4813..9f0bf5f0c8 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java
@@ -328,6 +328,7 @@ public class PutAzureEventHubTest {
         @Override
         protected EventHubClient createEventHubClient(
                 final String namespace,
+                final String serviceBusEndpoint,
                 final String eventHubName,
                 final String policyName,
                 final String policyKey,
@@ -346,6 +347,7 @@ public class PutAzureEventHubTest {
         @Override
         protected EventHubClient createEventHubClient(
                 final String namespace,
+                final String serviceBusEndpoint,
                 final String eventHubName,
                 final String policyName,
                 final String policyKey,
@@ -355,7 +357,7 @@ public class PutAzureEventHubTest {
     }
     private static class BogusConnectionStringMockPutAzureEventHub extends 
PutAzureEventHub{
         @Override
-        protected String getConnectionString(final String namespace, final 
String eventHubName, final String policyName, final String policyKey){
+        protected String getConnectionString(final String namespace, final 
String serviceBusEndpoint, final String eventHubName, final String policyName, 
final String policyKey){
             return "Bogus Connection String";
         }
     }
@@ -371,6 +373,7 @@ public class PutAzureEventHubTest {
         @Override
         protected EventHubClient createEventHubClient(
                 final String namespace,
+                final String serviceBusEndpoint,
                 final String eventHubName,
                 final String policyName,
                 final String policyKey,
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java
index 659e9a6f9a..85dead6e13 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java
@@ -78,6 +78,10 @@ public class TestConsumeAzureEventHub {
     private static final String storageAccountName = "test-sa";
     private static final String storageAccountKey = "test-sa-key";
     private static final String storageSasToken = "?test-sa-token";
+    private static final String serviceBusEndpoint = ".endpoint";
+
+    private static final String EXPECTED_TRANSIT_URI = "amqps://namespace" + 
serviceBusEndpoint + "/" +
+            
"eventhub-name/ConsumerGroups/consumer-group/Partitions/partition-id";
 
     private ConsumeAzureEventHub.EventProcessor eventProcessor;
     private MockProcessSession processSession;
@@ -98,6 +102,7 @@ public class TestConsumeAzureEventHub {
         final ProcessSessionFactory processSessionFactory = 
Mockito.mock(ProcessSessionFactory.class);
         processor.setProcessSessionFactory(processSessionFactory);
         processor.setNamespaceName("namespace");
+        processor.setServiceBusEndpoint(serviceBusEndpoint);
 
         sharedState = new SharedSessionState(processor, new AtomicLong(0));
         processSession = new MockProcessSession(sharedState, processor);
@@ -130,7 +135,6 @@ public class TestConsumeAzureEventHub {
         testRunner.setProperty(ConsumeAzureEventHub.STORAGE_ACCOUNT_NAME, 
storageAccountName);
         testRunner.setProperty(ConsumeAzureEventHub.STORAGE_ACCOUNT_KEY, 
storageAccountKey);
         testRunner.assertNotValid();
-
         
testRunner.setProperty(ConsumeAzureEventHub.USE_MANAGED_IDENTITY,"true");
         testRunner.assertValid();
     }
@@ -228,8 +232,7 @@ public class TestConsumeAzureEventHub {
         assertEquals(1, provenanceEvents.size());
         final ProvenanceEventRecord provenanceEvent1 = provenanceEvents.get(0);
         assertEquals(ProvenanceEventType.RECEIVE, 
provenanceEvent1.getEventType());
-        assertEquals("amqps://namespace.servicebus.windows.net/" +
-                
"eventhub-name/ConsumerGroups/consumer-group/Partitions/partition-id", 
provenanceEvent1.getTransitUri());
+        assertEquals(EXPECTED_TRANSIT_URI, provenanceEvent1.getTransitUri());
     }
 
     @Test
@@ -340,8 +343,8 @@ public class TestConsumeAzureEventHub {
             default:
                 final List<Record> recordList1 = 
addEndRecord.apply(recordList.subList(0, throwExceptionAt));
                 final List<Record> recordList2 = 
addEndRecord.apply(recordList.subList(throwExceptionAt + 1, recordList.size()));
-                final Record[] records1 = recordList1.toArray(new Record[0]);
-                final Record[] records2 = recordList2.toArray(new Record[0]);
+                final Record[] records1 = recordList1.toArray(new 
Record[recordList1.size()]);
+                final Record[] records2 = recordList2.toArray(new 
Record[recordList2.size()]);
                 when(reader.nextRecord())
                         .thenReturn(records1[0], Arrays.copyOfRange(records1, 
1, records1.length))
                         .thenThrow(new MalformedRecordException("Simulating 
Record parse failure."))
@@ -374,8 +377,7 @@ public class TestConsumeAzureEventHub {
         assertEquals(1, provenanceEvents.size());
         final ProvenanceEventRecord provenanceEvent1 = provenanceEvents.get(0);
         assertEquals(ProvenanceEventType.RECEIVE, 
provenanceEvent1.getEventType());
-        assertEquals("amqps://namespace.servicebus.windows.net/" +
-                
"eventhub-name/ConsumerGroups/consumer-group/Partitions/partition-id", 
provenanceEvent1.getTransitUri());
+        assertEquals(EXPECTED_TRANSIT_URI, provenanceEvent1.getTransitUri());
     }
 
     @Test
@@ -413,13 +415,11 @@ public class TestConsumeAzureEventHub {
 
         final ProvenanceEventRecord provenanceEvent1 = provenanceEvents.get(0);
         assertEquals(ProvenanceEventType.RECEIVE, 
provenanceEvent1.getEventType());
-        assertEquals("amqps://namespace.servicebus.windows.net/" +
-                
"eventhub-name/ConsumerGroups/consumer-group/Partitions/partition-id", 
provenanceEvent1.getTransitUri());
+        assertEquals(EXPECTED_TRANSIT_URI, provenanceEvent1.getTransitUri());
 
         final ProvenanceEventRecord provenanceEvent2 = provenanceEvents.get(1);
         assertEquals(ProvenanceEventType.RECEIVE, 
provenanceEvent2.getEventType());
-        assertEquals("amqps://namespace.servicebus.windows.net/" +
-                
"eventhub-name/ConsumerGroups/consumer-group/Partitions/partition-id", 
provenanceEvent2.getTransitUri());
+        assertEquals(EXPECTED_TRANSIT_URI, provenanceEvent2.getTransitUri());
     }
 
     @Test
@@ -450,8 +450,7 @@ public class TestConsumeAzureEventHub {
 
         final ProvenanceEventRecord provenanceEvent1 = provenanceEvents.get(0);
         assertEquals(ProvenanceEventType.RECEIVE, 
provenanceEvent1.getEventType());
-        assertEquals("amqps://namespace.servicebus.windows.net/" +
-                
"eventhub-name/ConsumerGroups/consumer-group/Partitions/partition-id", 
provenanceEvent1.getTransitUri());
+        assertEquals(EXPECTED_TRANSIT_URI, provenanceEvent1.getTransitUri());
     }
 
     @Test
@@ -489,12 +488,10 @@ public class TestConsumeAzureEventHub {
 
         final ProvenanceEventRecord provenanceEvent1 = provenanceEvents.get(0);
         assertEquals(ProvenanceEventType.RECEIVE, 
provenanceEvent1.getEventType());
-        assertEquals("amqps://namespace.servicebus.windows.net/" +
-                
"eventhub-name/ConsumerGroups/consumer-group/Partitions/partition-id", 
provenanceEvent1.getTransitUri());
+        assertEquals(EXPECTED_TRANSIT_URI, provenanceEvent1.getTransitUri());
 
         final ProvenanceEventRecord provenanceEvent2 = provenanceEvents.get(1);
         assertEquals(ProvenanceEventType.RECEIVE, 
provenanceEvent2.getEventType());
-        assertEquals("amqps://namespace.servicebus.windows.net/" +
-                
"eventhub-name/ConsumerGroups/consumer-group/Partitions/partition-id", 
provenanceEvent2.getTransitUri());
+        assertEquals(EXPECTED_TRANSIT_URI, provenanceEvent2.getTransitUri());
     }
 }

Reply via email to