Repository: nifi Updated Branches: refs/heads/master 4d0667380 -> 44fdc0e4e
NIFI-3640 uri eventhub changes This closes #1617 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/44fdc0e4 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/44fdc0e4 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/44fdc0e4 Branch: refs/heads/master Commit: 44fdc0e4ef3f0e9e99c2e39c1994e641ce02d146 Parents: 4d06673 Author: Joseph Niemiec <jniemie8> Authored: Thu Mar 23 11:19:56 2017 -0400 Committer: Jeff Storck <[email protected]> Committed: Thu May 25 13:14:51 2017 -0400 ---------------------------------------------------------------------- .../azure/eventhub/GetAzureEventHub.java | 22 +++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/44fdc0e4/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java index 12ea1ba..69d5586 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java @@ -40,6 +40,8 @@ import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.util.StopWatch; import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; import java.time.Duration; import java.time.Instant; import java.util.ArrayList; @@ -81,6 +83,15 @@ public class GetAzureEventHub extends AbstractProcessor { .expressionLanguageSupported(false) .required(true) .build(); + static final PropertyDescriptor SERVICE_BUS_ENDPOINT = new PropertyDescriptor.Builder() + .name("Service Bus Endpoint") + .description("To support Namespaces in non-standard Host URIs ( not .servicebus.windows.net, ie .servicebus.chinacloudapi.cn) select from the drop down acceptable options ") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(false) + .allowableValues(".servicebus.windows.net",".servicebus.chinacloudapi.cn") + .defaultValue(".servicebus.windows.net") + .required(true) + .build(); static final PropertyDescriptor ACCESS_POLICY = new PropertyDescriptor.Builder() .name("Shared Access Policy Name") .description("The name of the Event Hub Shared Access Policy. This Policy must have Listen permissions.") @@ -159,6 +170,7 @@ public class GetAzureEventHub extends AbstractProcessor { static { List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>(); _propertyDescriptors.add(EVENT_HUB_NAME); + _propertyDescriptors.add(SERVICE_BUS_ENDPOINT); _propertyDescriptors.add(NAMESPACE); _propertyDescriptors.add(ACCESS_POLICY); _propertyDescriptors.add(POLICY_PRIMARY_KEY); @@ -268,7 +280,7 @@ public class GetAzureEventHub extends AbstractProcessor { } @OnScheduled - public void onScheduled(final ProcessContext context) throws ProcessException { + public void onScheduled(final ProcessContext context) throws ProcessException, URISyntaxException { final BlockingQueue<String> partitionNames = new LinkedBlockingQueue<>(); for (int i = 0; i < context.getProperty(NUM_PARTITIONS).asInteger(); i++) { partitionNames.add(String.valueOf(i)); @@ -279,6 +291,9 @@ public class GetAzureEventHub extends AbstractProcessor { final String policyKey = context.getProperty(POLICY_PRIMARY_KEY).getValue(); final String namespace = context.getProperty(NAMESPACE).getValue(); final String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue(); + final String serviceBusEndpoint = context.getProperty(SERVICE_BUS_ENDPOINT).getValue(); + + if(context.getProperty(ENQUEUE_TIME).isSet()) { configuredEnqueueTime = Instant.parse(context.getProperty(ENQUEUE_TIME).toString()); @@ -296,7 +311,7 @@ public class GetAzureEventHub extends AbstractProcessor { receiverFetchTimeout = null; } - final String connectionString = new ConnectionStringBuilder(namespace, eventHubName, policyName, policyKey).toString(); + final String connectionString = new ConnectionStringBuilder(new URI("amqps://"+namespace+serviceBusEndpoint), eventHubName, policyName, policyKey).toString(); setupReceiver(connectionString); } @@ -346,7 +361,8 @@ public class GetAzureEventHub extends AbstractProcessor { final String namespace = context.getProperty(NAMESPACE).getValue(); final String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue(); final String consumerGroup = context.getProperty(CONSUMER_GROUP).getValue(); - final String transitUri = "amqps://" + namespace + ".servicebus.windows.net" + "/" + eventHubName + "/ConsumerGroups/" + consumerGroup + "/Partitions/" + partitionId; + final String serviceBusEndPoint = context.getProperty(SERVICE_BUS_ENDPOINT).getValue(); + final String transitUri = "amqps://" + namespace + serviceBusEndPoint + "/" + eventHubName + "/ConsumerGroups/" + consumerGroup + "/Partitions/" + partitionId; session.getProvenanceReporter().receive(flowFile, transitUri, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); } }
