This is an automated email from the ASF dual-hosted git repository. joewitt pushed a commit to branch support/nifi-1.16 in repository https://gitbox.apache.org/repos/asf/nifi.git
commit ba4db79196e5b36fdece1fb91b572dfc96c6530f Author: Timea Barna <[email protected]> AuthorDate: Wed Aug 11 09:29:56 2021 +0200 NIFI-8677 Added endpoint suffix for Azure EventHub Processors This closes #5303 Signed-off-by: Joey Frazee <[email protected]> --- .../azure/eventhub/ConsumeAzureEventHub.java | 29 +++++++++++++------- .../azure/eventhub/GetAzureEventHub.java | 20 ++++++-------- .../azure/eventhub/PutAzureEventHub.java | 20 +++++++++----- .../azure/eventhub/utils/AzureEventHubUtils.java | 31 +++++++++++++++++++--- .../azure/eventhub/PutAzureEventHubTest.java | 5 +++- .../azure/eventhub/TestConsumeAzureEventHub.java | 31 ++++++++++------------ 6 files changed, 87 insertions(+), 49 deletions(-) diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java index 2b0f2e3f43..92d34ddef9 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java @@ -93,8 +93,8 @@ import static org.apache.nifi.util.StringUtils.isEmpty; public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor { private static final Pattern SAS_TOKEN_PATTERN = Pattern.compile("^\\?.*$"); - private static final String FORMAT_STORAGE_CONNECTION_STRING_FOR_ACCOUNT_KEY = "DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s"; - private static final String FORMAT_STORAGE_CONNECTION_STRING_FOR_SAS_TOKEN = "BlobEndpoint=https://%s.blob.core.windows.net/;SharedAccessSignature=%s"; + private static final String FORMAT_STORAGE_CONNECTION_STRING_FOR_ACCOUNT_KEY = "DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s;EndpointSuffix=core.%s"; + private static final String FORMAT_STORAGE_CONNECTION_STRING_FOR_SAS_TOKEN = "BlobEndpoint=https://%s.blob.core.%s/;SharedAccessSignature=%s"; static final PropertyDescriptor NAMESPACE = new PropertyDescriptor.Builder() .name("event-hub-namespace") @@ -112,7 +112,7 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor { .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .required(true) .build(); - // TODO: Do we need to support custom service endpoints as GetAzureEventHub does? Is it possible? + static final PropertyDescriptor SERVICE_BUS_ENDPOINT = AzureEventHubUtils.SERVICE_BUS_ENDPOINT; static final PropertyDescriptor ACCESS_POLICY_NAME = new PropertyDescriptor.Builder() .name("event-hub-shared-access-policy-name") .displayName("Shared Access Policy Name") @@ -271,7 +271,7 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor { static { PROPERTIES = Collections.unmodifiableList(Arrays.asList( - NAMESPACE, EVENT_HUB_NAME, ACCESS_POLICY_NAME, POLICY_PRIMARY_KEY, USE_MANAGED_IDENTITY, CONSUMER_GROUP, CONSUMER_HOSTNAME, + NAMESPACE, EVENT_HUB_NAME, SERVICE_BUS_ENDPOINT, ACCESS_POLICY_NAME, POLICY_PRIMARY_KEY, USE_MANAGED_IDENTITY, CONSUMER_GROUP, CONSUMER_HOSTNAME, RECORD_READER, RECORD_WRITER, INITIAL_OFFSET, PREFETCH_COUNT, BATCH_SIZE, RECEIVE_TIMEOUT, STORAGE_ACCOUNT_NAME, STORAGE_ACCOUNT_KEY, STORAGE_SAS_TOKEN, STORAGE_CONTAINER_NAME @@ -293,6 +293,7 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor { private volatile String namespaceName; private volatile boolean isRecordReaderSet = false; private volatile boolean isRecordWriterSet = false; + private volatile String serviceBusEndpoint; /** * For unit test. @@ -322,6 +323,13 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor { this.writerFactory = writerFactory; } + /** + * For unit test. + */ + public void setServiceBusEndpoint(String serviceBusEndpoint) { + this.serviceBusEndpoint = serviceBusEndpoint; + } + @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { return PROPERTIES; @@ -464,7 +472,8 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor { private void transferTo(Relationship relationship, ProcessSession session, StopWatch stopWatch, String eventHubName, String partitionId, String consumerGroup, FlowFile flowFile) { session.transfer(flowFile, relationship); - final String transitUri = "amqps://" + namespaceName + ".servicebus.windows.net/" + eventHubName + "/ConsumerGroups/" + consumerGroup + "/Partitions/" + partitionId; + final String transitUri = String.format("amqps://%s%s/%s/ConsumerGroups/%s/Partitions/%s", + namespaceName, serviceBusEndpoint, eventHubName, consumerGroup, partitionId); session.getProvenanceReporter().receive(flowFile, transitUri, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); } @@ -652,13 +661,13 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor { final String connectionString; final boolean useManagedIdentity = context.getProperty(USE_MANAGED_IDENTITY).asBoolean(); if(useManagedIdentity) { - connectionString = AzureEventHubUtils.getManagedIdentityConnectionString(namespaceName, eventHubName); + connectionString = AzureEventHubUtils.getManagedIdentityConnectionString(namespaceName, serviceBusEndpoint, eventHubName); } else { final String sasName = context.getProperty(ACCESS_POLICY_NAME).evaluateAttributeExpressions().getValue(); validateRequiredProperty(ACCESS_POLICY_NAME, sasName); final String sasKey = context.getProperty(POLICY_PRIMARY_KEY).evaluateAttributeExpressions().getValue(); validateRequiredProperty(POLICY_PRIMARY_KEY, sasKey); - connectionString = AzureEventHubUtils.getSharedAccessSignatureConnectionString(namespaceName, eventHubName, sasName, sasKey); + connectionString = AzureEventHubUtils.getSharedAccessSignatureConnectionString(namespaceName, serviceBusEndpoint, eventHubName, sasName, sasKey); } eventProcessorHost = EventProcessorHost.EventProcessorHostBuilder @@ -680,13 +689,15 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor { final String storageAccountName = context.getProperty(STORAGE_ACCOUNT_NAME).evaluateAttributeExpressions().getValue(); validateRequiredProperty(STORAGE_ACCOUNT_NAME, storageAccountName); + serviceBusEndpoint = context.getProperty(SERVICE_BUS_ENDPOINT).getValue(); + final String domainName = serviceBusEndpoint.replace(".servicebus.", ""); final String storageAccountKey = context.getProperty(STORAGE_ACCOUNT_KEY).evaluateAttributeExpressions().getValue(); final String storageSasToken = context.getProperty(STORAGE_SAS_TOKEN).evaluateAttributeExpressions().getValue(); if (storageAccountKey != null) { - return String.format(FORMAT_STORAGE_CONNECTION_STRING_FOR_ACCOUNT_KEY, storageAccountName, storageAccountKey); + return String.format(FORMAT_STORAGE_CONNECTION_STRING_FOR_ACCOUNT_KEY, storageAccountName, storageAccountKey, domainName); } - return String.format(FORMAT_STORAGE_CONNECTION_STRING_FOR_SAS_TOKEN, storageAccountName, storageSasToken); + return String.format(FORMAT_STORAGE_CONNECTION_STRING_FOR_SAS_TOKEN, storageAccountName, domainName, storageSasToken); } private String orDefault(String value, String defaultValue) { diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java index f40f668e92..27c836ce3e 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java @@ -81,6 +81,9 @@ import org.apache.nifi.processors.azure.eventhub.utils.AzureEventHubUtils; @WritesAttribute(attribute = "eventhub.property.*", description = "The application properties of this message. IE: 'application' would be 'eventhub.property.application'") }) public class GetAzureEventHub extends AbstractProcessor { + private static final String TRANSIT_URI_FORMAT_STRING = "amqps://%s%s/%s/ConsumerGroups/%s/Partitions/%s"; + private static final String FORMAT_STRING_FOR_CONECTION_BUILDER = "amqps://%s%s"; + static final PropertyDescriptor EVENT_HUB_NAME = new PropertyDescriptor.Builder() .name("Event Hub Name") .description("The name of the event hub to pull messages from") @@ -94,15 +97,7 @@ public class GetAzureEventHub extends AbstractProcessor { .expressionLanguageSupported(ExpressionLanguageScope.NONE) .required(true) .build(); - static final PropertyDescriptor SERVICE_BUS_ENDPOINT = new PropertyDescriptor.Builder() - .name("Service Bus Endpoint") - .description("To support namespaces in non-standard Host URIs ( not .servicebus.windows.net, ie .servicebus.chinacloudapi.cn) select from the drop down acceptable options ") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.NONE) - .allowableValues(".servicebus.windows.net",".servicebus.chinacloudapi.cn") - .defaultValue(".servicebus.windows.net") - .required(true) - .build(); + static final PropertyDescriptor SERVICE_BUS_ENDPOINT =AzureEventHubUtils.SERVICE_BUS_ENDPOINT; static final PropertyDescriptor ACCESS_POLICY = new PropertyDescriptor.Builder() .name("Shared Access Policy Name") .description("The name of the shared access policy. This policy must have Listen claims.") @@ -315,12 +310,12 @@ public class GetAzureEventHub extends AbstractProcessor { final String connectionString; if(useManagedIdentity){ - connectionString = AzureEventHubUtils.getManagedIdentityConnectionString(namespace,eventHubName); + connectionString = AzureEventHubUtils.getManagedIdentityConnectionString(namespace, serviceBusEndpoint, eventHubName); } else { final String policyName = context.getProperty(ACCESS_POLICY).getValue(); final String policyKey = context.getProperty(POLICY_PRIMARY_KEY).getValue(); connectionString = new ConnectionStringBuilder() - .setEndpoint(new URI("amqps://"+namespace+serviceBusEndpoint)) + .setEndpoint(new URI(String.format(FORMAT_STRING_FOR_CONECTION_BUILDER, namespace, serviceBusEndpoint))) .setEventHubName(eventHubName) .setSasKeyName(policyName) .setSasKey(policyKey).toString(); @@ -394,7 +389,8 @@ public class GetAzureEventHub extends AbstractProcessor { final String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue(); final String consumerGroup = context.getProperty(CONSUMER_GROUP).getValue(); final String serviceBusEndPoint = context.getProperty(SERVICE_BUS_ENDPOINT).getValue(); - final String transitUri = "amqps://" + namespace + serviceBusEndPoint + "/" + eventHubName + "/ConsumerGroups/" + consumerGroup + "/Partitions/" + partitionId; + final String transitUri = String.format(TRANSIT_URI_FORMAT_STRING, + namespace, serviceBusEndPoint, eventHubName, consumerGroup, partitionId); session.getProvenanceReporter().receive(flowFile, transitUri, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); } } diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java index 51bb5bf611..f588f76a59 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java @@ -76,6 +76,8 @@ import org.apache.nifi.util.StopWatch; + "Also please be aware that this processor creates a thread pool of 4 threads for Event Hub Client. They will be extra threads other than the concurrent tasks scheduled for this processor.") @SystemResourceConsideration(resource = SystemResource.MEMORY) public class PutAzureEventHub extends AbstractProcessor { + private static final String TRANSIT_URI_FORMAT_STRING = "amqps://%s%s/%s"; + static final PropertyDescriptor EVENT_HUB_NAME = new PropertyDescriptor.Builder() .name("Event Hub Name") .description("The name of the event hub to send to") @@ -89,6 +91,7 @@ public class PutAzureEventHub extends AbstractProcessor { .expressionLanguageSupported(ExpressionLanguageScope.NONE) .required(true) .build(); + static final PropertyDescriptor SERVICE_BUS_ENDPOINT = AzureEventHubUtils.SERVICE_BUS_ENDPOINT; static final PropertyDescriptor ACCESS_POLICY = new PropertyDescriptor.Builder() .name("Shared Access Policy Name") .description("The name of the shared access policy. This policy must have Send claims.") @@ -140,6 +143,7 @@ public class PutAzureEventHub extends AbstractProcessor { List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>(); _propertyDescriptors.add(EVENT_HUB_NAME); _propertyDescriptors.add(NAMESPACE); + _propertyDescriptors.add(SERVICE_BUS_ENDPOINT); _propertyDescriptors.add(ACCESS_POLICY); _propertyDescriptors.add(POLICY_PRIMARY_KEY); _propertyDescriptors.add(USE_MANAGED_IDENTITY); @@ -241,7 +245,9 @@ public class PutAzureEventHub extends AbstractProcessor { if(flowFileResult.getResult() == REL_SUCCESS) { final String namespace = context.getProperty(NAMESPACE).getValue(); final String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue(); - session.getProvenanceReporter().send(flowFile, "amqps://" + namespace + ".servicebus.windows.net" + "/" + eventHubName, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); + final String serviceBusEndpoint = context.getProperty(SERVICE_BUS_ENDPOINT).getValue(); + final String transitUri = String.format(TRANSIT_URI_FORMAT_STRING, namespace, serviceBusEndpoint, eventHubName); + session.getProvenanceReporter().send(flowFile, transitUri, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); session.transfer(flowFile, REL_SUCCESS); } else { @@ -330,9 +336,10 @@ public class PutAzureEventHub extends AbstractProcessor { policyKey = context.getProperty(POLICY_PRIMARY_KEY).getValue(); } final String namespace = context.getProperty(NAMESPACE).getValue(); + final String serviceBusEndpoint = context.getProperty(SERVICE_BUS_ENDPOINT).getValue(); final String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue(); for (int i = 0; i < numThreads; i++) { - final EventHubClient client = createEventHubClient(namespace, eventHubName, policyName, policyKey, executor); + final EventHubClient client = createEventHubClient(namespace, serviceBusEndpoint, eventHubName, policyName, policyKey, executor); if(null != client) { senderQueue.offer(client); } @@ -351,6 +358,7 @@ public class PutAzureEventHub extends AbstractProcessor { */ protected EventHubClient createEventHubClient( final String namespace, + final String serviceBusEndpoint, final String eventHubName, final String policyName, final String policyKey, @@ -361,9 +369,9 @@ public class PutAzureEventHub extends AbstractProcessor { EventHubClientImpl.USER_AGENT = "ApacheNiFi-azureeventhub/3.1.1"; final String connectionString; if(policyName == AzureEventHubUtils.MANAGED_IDENTITY_POLICY) { - connectionString = AzureEventHubUtils.getManagedIdentityConnectionString(namespace, eventHubName); + connectionString = AzureEventHubUtils.getManagedIdentityConnectionString(namespace, serviceBusEndpoint, eventHubName); } else{ - connectionString = getConnectionString(namespace, eventHubName, policyName, policyKey); + connectionString = getConnectionString(namespace, serviceBusEndpoint, eventHubName, policyName, policyKey); } return EventHubClient.createFromConnectionStringSync(connectionString, executor); } catch (IOException | EventHubException | IllegalConnectionStringFormatException e) { @@ -372,8 +380,8 @@ public class PutAzureEventHub extends AbstractProcessor { } } - protected String getConnectionString(final String namespace, final String eventHubName, final String policyName, final String policyKey){ - return AzureEventHubUtils.getSharedAccessSignatureConnectionString(namespace, eventHubName, policyName, policyKey); + protected String getConnectionString(final String namespace, final String serviceBusEndpoint, final String eventHubName, final String policyName, final String policyKey){ + return AzureEventHubUtils.getSharedAccessSignatureConnectionString(namespace, serviceBusEndpoint, eventHubName, policyName, policyKey); } /** diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/utils/AzureEventHubUtils.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/utils/AzureEventHubUtils.java index f71f6c8495..cb19f2e54b 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/utils/AzureEventHubUtils.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/utils/AzureEventHubUtils.java @@ -24,6 +24,7 @@ import java.util.HashMap; import com.microsoft.azure.eventhubs.ConnectionStringBuilder; import com.microsoft.azure.eventhubs.EventData; +import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; @@ -33,6 +34,10 @@ import org.apache.nifi.processor.util.StandardValidators; public final class AzureEventHubUtils { public static final String MANAGED_IDENTITY_POLICY = ConnectionStringBuilder.MANAGED_IDENTITY_AUTHENTICATION; + public static final AllowableValue AZURE_ENDPOINT = new AllowableValue(".servicebus.windows.net","Azure", "Servicebus endpoint for general use"); + public static final AllowableValue AZURE_CHINA_ENDPOINT = new AllowableValue(".servicebus.chinacloudapi.cn", "Azure China", "Servicebus endpoint for China"); + public static final AllowableValue AZURE_GERMANY_ENDPOINT = new AllowableValue(".servicebus.cloudapi.de", "Azure Germany", "Servicebus endpoint for Germany"); + public static final AllowableValue AZURE_US_GOV_ENDPOINT = new AllowableValue(".servicebus.usgovcloudapi.net", "Azure US Government", "Servicebus endpoint for US Government"); public static final PropertyDescriptor POLICY_PRIMARY_KEY = new PropertyDescriptor.Builder() .name("Shared Access Policy Primary Key") @@ -50,6 +55,17 @@ public final class AzureEventHubUtils { .required(false).defaultValue("false").allowableValues("true", "false") .addValidator(StandardValidators.BOOLEAN_VALIDATOR).build(); + public static final PropertyDescriptor SERVICE_BUS_ENDPOINT = new PropertyDescriptor.Builder() + .name("Service Bus Endpoint") + .description("To support namespaces not in the default windows.net domain.") + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .allowableValues(AzureEventHubUtils.AZURE_ENDPOINT, AzureEventHubUtils.AZURE_CHINA_ENDPOINT, + AzureEventHubUtils.AZURE_GERMANY_ENDPOINT, AzureEventHubUtils.AZURE_US_GOV_ENDPOINT) + .defaultValue(AzureEventHubUtils.AZURE_ENDPOINT.getValue()) + .required(true) + .build(); + public static List<ValidationResult> customValidate(PropertyDescriptor accessPolicyDescriptor, PropertyDescriptor policyKeyDescriptor, ValidationContext context) { @@ -79,13 +95,15 @@ public final class AzureEventHubUtils { return retVal; } - public static String getManagedIdentityConnectionString(final String namespace, final String eventHubName){ - return new ConnectionStringBuilder().setNamespaceName(namespace).setEventHubName(eventHubName) + public static String getManagedIdentityConnectionString(final String namespace, final String domainName, final String eventHubName){ + return new ConnectionStringBuilder() + .setEndpoint(namespace, removeStartingDotFrom(domainName)) + .setEventHubName(eventHubName) .setAuthentication(MANAGED_IDENTITY_POLICY).toString(); } - public static String getSharedAccessSignatureConnectionString(final String namespace, final String eventHubName, final String sasName, final String sasKey) { + public static String getSharedAccessSignatureConnectionString(final String namespace, final String domainName, final String eventHubName, final String sasName, final String sasKey) { return new ConnectionStringBuilder() - .setNamespaceName(namespace) + .setEndpoint(namespace, removeStartingDotFrom(domainName)) .setEventHubName(eventHubName) .setSasKeyName(sasName) .setSasKey(sasKey).toString(); @@ -103,4 +121,9 @@ public final class AzureEventHubUtils { return properties; } + + private static String removeStartingDotFrom(final String domainName) { + return domainName.replaceFirst("^\\.", ""); + } + } diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java index d83aae4813..9f0bf5f0c8 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java @@ -328,6 +328,7 @@ public class PutAzureEventHubTest { @Override protected EventHubClient createEventHubClient( final String namespace, + final String serviceBusEndpoint, final String eventHubName, final String policyName, final String policyKey, @@ -346,6 +347,7 @@ public class PutAzureEventHubTest { @Override protected EventHubClient createEventHubClient( final String namespace, + final String serviceBusEndpoint, final String eventHubName, final String policyName, final String policyKey, @@ -355,7 +357,7 @@ public class PutAzureEventHubTest { } private static class BogusConnectionStringMockPutAzureEventHub extends PutAzureEventHub{ @Override - protected String getConnectionString(final String namespace, final String eventHubName, final String policyName, final String policyKey){ + protected String getConnectionString(final String namespace, final String serviceBusEndpoint, final String eventHubName, final String policyName, final String policyKey){ return "Bogus Connection String"; } } @@ -371,6 +373,7 @@ public class PutAzureEventHubTest { @Override protected EventHubClient createEventHubClient( final String namespace, + final String serviceBusEndpoint, final String eventHubName, final String policyName, final String policyKey, diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java index 659e9a6f9a..85dead6e13 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java @@ -78,6 +78,10 @@ public class TestConsumeAzureEventHub { private static final String storageAccountName = "test-sa"; private static final String storageAccountKey = "test-sa-key"; private static final String storageSasToken = "?test-sa-token"; + private static final String serviceBusEndpoint = ".endpoint"; + + private static final String EXPECTED_TRANSIT_URI = "amqps://namespace" + serviceBusEndpoint + "/" + + "eventhub-name/ConsumerGroups/consumer-group/Partitions/partition-id"; private ConsumeAzureEventHub.EventProcessor eventProcessor; private MockProcessSession processSession; @@ -98,6 +102,7 @@ public class TestConsumeAzureEventHub { final ProcessSessionFactory processSessionFactory = Mockito.mock(ProcessSessionFactory.class); processor.setProcessSessionFactory(processSessionFactory); processor.setNamespaceName("namespace"); + processor.setServiceBusEndpoint(serviceBusEndpoint); sharedState = new SharedSessionState(processor, new AtomicLong(0)); processSession = new MockProcessSession(sharedState, processor); @@ -130,7 +135,6 @@ public class TestConsumeAzureEventHub { testRunner.setProperty(ConsumeAzureEventHub.STORAGE_ACCOUNT_NAME, storageAccountName); testRunner.setProperty(ConsumeAzureEventHub.STORAGE_ACCOUNT_KEY, storageAccountKey); testRunner.assertNotValid(); - testRunner.setProperty(ConsumeAzureEventHub.USE_MANAGED_IDENTITY,"true"); testRunner.assertValid(); } @@ -228,8 +232,7 @@ public class TestConsumeAzureEventHub { assertEquals(1, provenanceEvents.size()); final ProvenanceEventRecord provenanceEvent1 = provenanceEvents.get(0); assertEquals(ProvenanceEventType.RECEIVE, provenanceEvent1.getEventType()); - assertEquals("amqps://namespace.servicebus.windows.net/" + - "eventhub-name/ConsumerGroups/consumer-group/Partitions/partition-id", provenanceEvent1.getTransitUri()); + assertEquals(EXPECTED_TRANSIT_URI, provenanceEvent1.getTransitUri()); } @Test @@ -340,8 +343,8 @@ public class TestConsumeAzureEventHub { default: final List<Record> recordList1 = addEndRecord.apply(recordList.subList(0, throwExceptionAt)); final List<Record> recordList2 = addEndRecord.apply(recordList.subList(throwExceptionAt + 1, recordList.size())); - final Record[] records1 = recordList1.toArray(new Record[0]); - final Record[] records2 = recordList2.toArray(new Record[0]); + final Record[] records1 = recordList1.toArray(new Record[recordList1.size()]); + final Record[] records2 = recordList2.toArray(new Record[recordList2.size()]); when(reader.nextRecord()) .thenReturn(records1[0], Arrays.copyOfRange(records1, 1, records1.length)) .thenThrow(new MalformedRecordException("Simulating Record parse failure.")) @@ -374,8 +377,7 @@ public class TestConsumeAzureEventHub { assertEquals(1, provenanceEvents.size()); final ProvenanceEventRecord provenanceEvent1 = provenanceEvents.get(0); assertEquals(ProvenanceEventType.RECEIVE, provenanceEvent1.getEventType()); - assertEquals("amqps://namespace.servicebus.windows.net/" + - "eventhub-name/ConsumerGroups/consumer-group/Partitions/partition-id", provenanceEvent1.getTransitUri()); + assertEquals(EXPECTED_TRANSIT_URI, provenanceEvent1.getTransitUri()); } @Test @@ -413,13 +415,11 @@ public class TestConsumeAzureEventHub { final ProvenanceEventRecord provenanceEvent1 = provenanceEvents.get(0); assertEquals(ProvenanceEventType.RECEIVE, provenanceEvent1.getEventType()); - assertEquals("amqps://namespace.servicebus.windows.net/" + - "eventhub-name/ConsumerGroups/consumer-group/Partitions/partition-id", provenanceEvent1.getTransitUri()); + assertEquals(EXPECTED_TRANSIT_URI, provenanceEvent1.getTransitUri()); final ProvenanceEventRecord provenanceEvent2 = provenanceEvents.get(1); assertEquals(ProvenanceEventType.RECEIVE, provenanceEvent2.getEventType()); - assertEquals("amqps://namespace.servicebus.windows.net/" + - "eventhub-name/ConsumerGroups/consumer-group/Partitions/partition-id", provenanceEvent2.getTransitUri()); + assertEquals(EXPECTED_TRANSIT_URI, provenanceEvent2.getTransitUri()); } @Test @@ -450,8 +450,7 @@ public class TestConsumeAzureEventHub { final ProvenanceEventRecord provenanceEvent1 = provenanceEvents.get(0); assertEquals(ProvenanceEventType.RECEIVE, provenanceEvent1.getEventType()); - assertEquals("amqps://namespace.servicebus.windows.net/" + - "eventhub-name/ConsumerGroups/consumer-group/Partitions/partition-id", provenanceEvent1.getTransitUri()); + assertEquals(EXPECTED_TRANSIT_URI, provenanceEvent1.getTransitUri()); } @Test @@ -489,12 +488,10 @@ public class TestConsumeAzureEventHub { final ProvenanceEventRecord provenanceEvent1 = provenanceEvents.get(0); assertEquals(ProvenanceEventType.RECEIVE, provenanceEvent1.getEventType()); - assertEquals("amqps://namespace.servicebus.windows.net/" + - "eventhub-name/ConsumerGroups/consumer-group/Partitions/partition-id", provenanceEvent1.getTransitUri()); + assertEquals(EXPECTED_TRANSIT_URI, provenanceEvent1.getTransitUri()); final ProvenanceEventRecord provenanceEvent2 = provenanceEvents.get(1); assertEquals(ProvenanceEventType.RECEIVE, provenanceEvent2.getEventType()); - assertEquals("amqps://namespace.servicebus.windows.net/" + - "eventhub-name/ConsumerGroups/consumer-group/Partitions/partition-id", provenanceEvent2.getTransitUri()); + assertEquals(EXPECTED_TRANSIT_URI, provenanceEvent2.getTransitUri()); } }
