Repository: nifi
Updated Branches:
  refs/heads/master 4d0667380 -> 44fdc0e4e


NIFI-3640 uri eventhub changes

This closes #1617


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/44fdc0e4
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/44fdc0e4
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/44fdc0e4

Branch: refs/heads/master
Commit: 44fdc0e4ef3f0e9e99c2e39c1994e641ce02d146
Parents: 4d06673
Author: Joseph Niemiec <jniemie8>
Authored: Thu Mar 23 11:19:56 2017 -0400
Committer: Jeff Storck <[email protected]>
Committed: Thu May 25 13:14:51 2017 -0400

----------------------------------------------------------------------
 .../azure/eventhub/GetAzureEventHub.java        | 22 +++++++++++++++++---
 1 file changed, 19 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/44fdc0e4/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
index 12ea1ba..69d5586 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
@@ -40,6 +40,8 @@ import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.util.StopWatch;
 
 import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.ArrayList;
@@ -81,6 +83,15 @@ public class GetAzureEventHub extends AbstractProcessor {
             .expressionLanguageSupported(false)
             .required(true)
             .build();
+    static final PropertyDescriptor SERVICE_BUS_ENDPOINT = new 
PropertyDescriptor.Builder()
+            .name("Service Bus Endpoint")
+            .description("To support Namespaces in non-standard Host URIs ( 
not .servicebus.windows.net,  ie .servicebus.chinacloudapi.cn) select from the 
drop down acceptable options ")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(false)
+            
.allowableValues(".servicebus.windows.net",".servicebus.chinacloudapi.cn")
+            .defaultValue(".servicebus.windows.net")
+            .required(true)
+            .build();
     static final PropertyDescriptor ACCESS_POLICY = new 
PropertyDescriptor.Builder()
             .name("Shared Access Policy Name")
             .description("The name of the Event Hub Shared Access Policy. This 
Policy must have Listen permissions.")
@@ -159,6 +170,7 @@ public class GetAzureEventHub extends AbstractProcessor {
     static {
         List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
         _propertyDescriptors.add(EVENT_HUB_NAME);
+        _propertyDescriptors.add(SERVICE_BUS_ENDPOINT);
         _propertyDescriptors.add(NAMESPACE);
         _propertyDescriptors.add(ACCESS_POLICY);
         _propertyDescriptors.add(POLICY_PRIMARY_KEY);
@@ -268,7 +280,7 @@ public class GetAzureEventHub extends AbstractProcessor {
     }
 
     @OnScheduled
-    public void onScheduled(final ProcessContext context) throws 
ProcessException {
+    public void onScheduled(final ProcessContext context) throws 
ProcessException, URISyntaxException {
         final BlockingQueue<String> partitionNames = new 
LinkedBlockingQueue<>();
         for (int i = 0; i < context.getProperty(NUM_PARTITIONS).asInteger(); 
i++) {
             partitionNames.add(String.valueOf(i));
@@ -279,6 +291,9 @@ public class GetAzureEventHub extends AbstractProcessor {
         final String policyKey = 
context.getProperty(POLICY_PRIMARY_KEY).getValue();
         final String namespace = context.getProperty(NAMESPACE).getValue();
         final String eventHubName = 
context.getProperty(EVENT_HUB_NAME).getValue();
+        final String serviceBusEndpoint = 
context.getProperty(SERVICE_BUS_ENDPOINT).getValue();
+
+
 
         if(context.getProperty(ENQUEUE_TIME).isSet()) {
             configuredEnqueueTime = 
Instant.parse(context.getProperty(ENQUEUE_TIME).toString());
@@ -296,7 +311,7 @@ public class GetAzureEventHub extends AbstractProcessor {
             receiverFetchTimeout = null;
         }
 
-        final String connectionString = new ConnectionStringBuilder(namespace, 
eventHubName, policyName, policyKey).toString();
+        final String connectionString = new ConnectionStringBuilder(new 
URI("amqps://"+namespace+serviceBusEndpoint), eventHubName, policyName, 
policyKey).toString();
         setupReceiver(connectionString);
     }
 
@@ -346,7 +361,8 @@ public class GetAzureEventHub extends AbstractProcessor {
                     final String namespace = 
context.getProperty(NAMESPACE).getValue();
                     final String eventHubName = 
context.getProperty(EVENT_HUB_NAME).getValue();
                     final String consumerGroup = 
context.getProperty(CONSUMER_GROUP).getValue();
-                    final String transitUri = "amqps://" + namespace + 
".servicebus.windows.net" + "/" + eventHubName + "/ConsumerGroups/" + 
consumerGroup + "/Partitions/" + partitionId;
+                    final String serviceBusEndPoint = 
context.getProperty(SERVICE_BUS_ENDPOINT).getValue();
+                    final String transitUri = "amqps://" + namespace + 
serviceBusEndPoint + "/" + eventHubName + "/ConsumerGroups/" + consumerGroup + 
"/Partitions/" + partitionId;
                     session.getProvenanceReporter().receive(flowFile, 
transitUri, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
                 }
             }

Reply via email to