Repository: samza
Updated Branches:
  refs/heads/master 956cf412a -> 272aa32e3


Making event hub configs Samza compliant

Fixes for Bugs

- SAMZA-1571 Make Eventhubs system configs compatible with Samza standalone.
- SAMZA-1624 EventHub system should prefix the configs with senstive for SasKey 
and SasToken
- SAMZA-1625 EventHub systemAdmin is swallowing exceptions
- SAMZA-1626 EventHub system admin is not returning the metadata for all the 
ssps requested for

Description

1. Right now event hub doesn't follow the samza's config convention of naming 
the secrets as "sensitive" so that they are masked before they are logged.
2. Event hub configs uses the old system.<systemName>.streams.<streamName> 
which is blacklisted in Samza standalone. So moving these configs to newer 
<streams>.<streamid>
3. Wrapping the underlying exception properly in the SamzaException in 
EventHubSystemAdmin
4. Porting Bharat's fix to return the metadata for all the ssps requested for 
in EventHubSystemAdmin

Author: Srinivasulu Punuru <[email protected]>

Reviewers: Xinyu Liu <[email protected]>

Closes #453 from srinipunuru/eh.1


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/272aa32e
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/272aa32e
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/272aa32e

Branch: refs/heads/master
Commit: 272aa32e3045a7eb4b429e7790931a914317eef3
Parents: 956cf41
Author: Srinivasulu Punuru <[email protected]>
Authored: Thu Mar 22 17:08:11 2018 -0700
Committer: xiliu <[email protected]>
Committed: Thu Mar 22 17:08:11 2018 -0700

----------------------------------------------------------------------
 .../samza/system/eventhub/EventHubConfig.java   | 30 ++++++------
 .../eventhub/admin/EventHubSystemAdmin.java     | 10 ++--
 .../eventhub/MockEventHubConfigFactory.java     | 18 ++++----
 .../consumer/TestEventHubSystemConsumer.java    | 48 ++++++++++----------
 .../producer/TestEventHubSystemProducer.java    | 24 +++++-----
 5 files changed, 66 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/272aa32e/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java
----------------------------------------------------------------------
diff --git 
a/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java
 
b/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java
index 5d83911..e9c383a 100644
--- 
a/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java
+++ 
b/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java
@@ -36,15 +36,15 @@ public class EventHubConfig extends MapConfig {
 
   public static final String CONFIG_STREAM_LIST = "systems.%s.stream.list";
 
-  public static final String CONFIG_STREAM_NAMESPACE = 
"systems.%s.streams.%s.eventhubs.namespace";
+  public static final String CONFIG_STREAM_NAMESPACE = 
"streams.%s.eventhubs.namespace";
 
-  public static final String CONFIG_STREAM_ENTITYPATH = 
"systems.%s.streams.%s.eventhubs.entitypath";
+  public static final String CONFIG_STREAM_ENTITYPATH = 
"streams.%s.eventhubs.entitypath";
 
-  public static final String CONFIG_STREAM_SAS_KEY_NAME = 
"systems.%s.streams.%s.eventhubs.sas.keyname";
+  public static final String CONFIG_STREAM_SAS_KEY_NAME = 
Config.SENSITIVE_PREFIX + "streams.%s.eventhubs.sas.keyname";
 
-  public static final String CONFIG_STREAM_SAS_TOKEN = 
"systems.%s.streams.%s.eventhubs.sas.token";
+  public static final String CONFIG_STREAM_SAS_TOKEN = Config.SENSITIVE_PREFIX 
+ "streams.%s.eventhubs.sas.token";
 
-  public static final String CONFIG_STREAM_CONSUMER_GROUP = 
"systems.%s.streams.%s.eventhubs.consumer.group";
+  public static final String CONFIG_STREAM_CONSUMER_GROUP = 
"streams.%s.eventhubs.consumer.group";
   public static final String DEFAULT_CONFIG_STREAM_CONSUMER_GROUP = 
EventHubClient.DEFAULT_CONSUMER_GROUP_NAME;
 
   public static final String CONFIG_PRODUCER_PARTITION_METHOD = 
"systems.%s.eventhubs.partition.method";
@@ -71,18 +71,18 @@ public class EventHubConfig extends MapConfig {
             .forEach((streamId) -> 
physcialToId.put(streamConfig.getPhysicalName(streamId), streamId));
   }
 
-  private String getFromStreamIdOrName(String configName, String systemName, 
String streamName, String defaultString) {
-    String result = getFromStreamIdOrName(configName, systemName, streamName);
+  private String getFromStreamIdOrName(String configName, String streamName, 
String defaultString) {
+    String result = getFromStreamIdOrName(configName, streamName);
     if (result == null) {
       return defaultString;
     }
     return result;
   }
 
-  private String getFromStreamIdOrName(String configName, String systemName, 
String streamName) {
+  private String getFromStreamIdOrName(String configName, String streamName) {
     String streamId = getStreamId(streamName);
-    return get(String.format(configName, systemName, streamId),
-            streamId.equals(streamName) ? null : get(String.format(configName, 
systemName, streamName)));
+    return get(String.format(configName, streamId),
+            streamId.equals(streamName) ? null : get(String.format(configName, 
streamName)));
   }
 
   private String validateRequiredConfig(String value, String fieldName, String 
systemName, String streamName) {
@@ -122,7 +122,7 @@ public class EventHubConfig extends MapConfig {
    * @return EventHubs namespace
    */
   public String getStreamNamespace(String systemName, String streamName) {
-    return 
validateRequiredConfig(getFromStreamIdOrName(CONFIG_STREAM_NAMESPACE, 
systemName, streamName),
+    return 
validateRequiredConfig(getFromStreamIdOrName(CONFIG_STREAM_NAMESPACE, 
streamName),
             "Namespace", systemName, streamName);
   }
 
@@ -134,7 +134,7 @@ public class EventHubConfig extends MapConfig {
    * @return EventHubs entity path
    */
   public String getStreamEntityPath(String systemName, String streamName) {
-    return 
validateRequiredConfig(getFromStreamIdOrName(CONFIG_STREAM_ENTITYPATH, 
systemName, streamName),
+    return 
validateRequiredConfig(getFromStreamIdOrName(CONFIG_STREAM_ENTITYPATH, 
streamName),
             "EntityPath", systemName, streamName);
   }
 
@@ -146,7 +146,7 @@ public class EventHubConfig extends MapConfig {
    * @return EventHubs SAS key name
    */
   public String getStreamSasKeyName(String systemName, String streamName) {
-    return 
validateRequiredConfig(getFromStreamIdOrName(CONFIG_STREAM_SAS_KEY_NAME, 
systemName, streamName),
+    return 
validateRequiredConfig(getFromStreamIdOrName(CONFIG_STREAM_SAS_KEY_NAME, 
streamName),
             "SASKeyName", systemName, streamName);
   }
 
@@ -158,7 +158,7 @@ public class EventHubConfig extends MapConfig {
    * @return EventHubs SAS token
    */
   public String getStreamSasToken(String systemName, String streamName) {
-    return 
validateRequiredConfig(getFromStreamIdOrName(CONFIG_STREAM_SAS_TOKEN, 
systemName, streamName),
+    return 
validateRequiredConfig(getFromStreamIdOrName(CONFIG_STREAM_SAS_TOKEN, 
streamName),
             "SASToken", systemName, streamName);
   }
 
@@ -170,7 +170,7 @@ public class EventHubConfig extends MapConfig {
    * @return EventHubs consumer group
    */
   public String getStreamConsumerGroup(String systemName, String streamName) {
-    return getFromStreamIdOrName(CONFIG_STREAM_CONSUMER_GROUP, systemName, 
streamName, DEFAULT_CONFIG_STREAM_CONSUMER_GROUP);
+    return getFromStreamIdOrName(CONFIG_STREAM_CONSUMER_GROUP, streamName, 
DEFAULT_CONFIG_STREAM_CONSUMER_GROUP);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/272aa32e/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java
----------------------------------------------------------------------
diff --git 
a/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java
 
b/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java
index 91d504c..5564747 100644
--- 
a/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java
+++ 
b/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java
@@ -104,9 +104,10 @@ public class EventHubSystemAdmin implements SystemAdmin {
               streamPartitions.put(streamName, ehInfo.getPartitionIds());
             } catch (InterruptedException | ExecutionException | 
TimeoutException e) {
 
-              String msg = String.format("Error while fetching 
EventHubRuntimeInfo for System:%s, Stream:%s",
-                      systemName, streamName);
-              throw new SamzaException(msg);
+              String msg = String.format("Error while fetching 
EventHubRuntimeInfo for System:%s, Stream:%s", systemName,
+                  streamName);
+              LOG.error(msg, e);
+              throw new SamzaException(msg, e);
             }
           }
 
@@ -172,7 +173,8 @@ public class EventHubSystemAdmin implements SystemAdmin {
           String msg = String.format(
                   "Error while fetching EventHubPartitionRuntimeInfo for 
System:%s, Stream:%s, Partition:%s",
                   systemName, streamName, partitionId);
-          throw new SamzaException(msg);
+          LOG.error(msg, e);
+          throw new SamzaException(msg, e);
         }
       });
     return sspMetadataMap;

http://git-wip-us.apache.org/repos/asf/samza/blob/272aa32e/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventHubConfigFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventHubConfigFactory.java
 
b/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventHubConfigFactory.java
index 1d8e0ce..0f512ac 100644
--- 
a/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventHubConfigFactory.java
+++ 
b/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventHubConfigFactory.java
@@ -46,15 +46,15 @@ public class MockEventHubConfigFactory {
     
mapConfig.put(String.format(EventHubConfig.CONFIG_PRODUCER_PARTITION_METHOD, 
SYSTEM_NAME), partitioningMethod.toString());
     mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_LIST, 
SYSTEM_NAME), STREAM_NAME1 + "," + STREAM_NAME2);
 
-    mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, 
SYSTEM_NAME, STREAM_NAME1), EVENTHUB_NAMESPACE);
-    mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, 
SYSTEM_NAME, STREAM_NAME1), EVENTHUB_ENTITY1);
-    mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, 
SYSTEM_NAME, STREAM_NAME1), EVENTHUB_KEY_NAME);
-    mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, 
SYSTEM_NAME, STREAM_NAME1), EVENTHUB_KEY);
-
-    mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, 
SYSTEM_NAME, STREAM_NAME2), EVENTHUB_NAMESPACE);
-    mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, 
SYSTEM_NAME, STREAM_NAME2), EVENTHUB_ENTITY2);
-    mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, 
SYSTEM_NAME, STREAM_NAME2), EVENTHUB_KEY_NAME);
-    mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, 
SYSTEM_NAME, STREAM_NAME2), EVENTHUB_KEY);
+    mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, 
STREAM_NAME1), EVENTHUB_NAMESPACE);
+    mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, 
STREAM_NAME1), EVENTHUB_ENTITY1);
+    mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, 
STREAM_NAME1), EVENTHUB_KEY_NAME);
+    mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, 
STREAM_NAME1), EVENTHUB_KEY);
+
+    mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, 
STREAM_NAME2), EVENTHUB_NAMESPACE);
+    mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, 
STREAM_NAME2), EVENTHUB_ENTITY2);
+    mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, 
STREAM_NAME2), EVENTHUB_KEY_NAME);
+    mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, 
STREAM_NAME2), EVENTHUB_KEY);
 
     return new MapConfig(mapConfig);
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/272aa32e/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/TestEventHubSystemConsumer.java
----------------------------------------------------------------------
diff --git 
a/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/TestEventHubSystemConsumer.java
 
b/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/TestEventHubSystemConsumer.java
index 865a248..4fced77 100644
--- 
a/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/TestEventHubSystemConsumer.java
+++ 
b/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/TestEventHubSystemConsumer.java
@@ -83,10 +83,10 @@ public class TestEventHubSystemConsumer {
     // Set configs
     Map<String, String> configMap = new HashMap<>();
     configMap.put(String.format(EventHubConfig.CONFIG_STREAM_LIST, 
systemName), streamName);
-    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, 
systemName, streamName), EVENTHUB_NAMESPACE);
-    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, 
systemName, streamName), EVENTHUB_KEY_NAME);
-    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, 
systemName, streamName), EVENTHUB_KEY);
-    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, 
systemName, streamName), MOCK_ENTITY_1);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, 
streamName), EVENTHUB_NAMESPACE);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, 
streamName), EVENTHUB_KEY_NAME);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, 
streamName), EVENTHUB_KEY);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, 
streamName), MOCK_ENTITY_1);
     MapConfig config = new MapConfig(configMap);
 
     MockEventHubClientManagerFactory eventHubClientWrapperFactory = new 
MockEventHubClientManagerFactory(eventData);
@@ -123,10 +123,10 @@ public class TestEventHubSystemConsumer {
     // Set configs
     Map<String, String> configMap = new HashMap<>();
     configMap.put(String.format(EventHubConfig.CONFIG_STREAM_LIST, 
systemName), streamName);
-    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, 
systemName, streamName), EVENTHUB_NAMESPACE);
-    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, 
systemName, streamName), EVENTHUB_KEY_NAME);
-    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, 
systemName, streamName), EVENTHUB_KEY);
-    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, 
systemName, streamName), MOCK_ENTITY_1);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, 
streamName), EVENTHUB_NAMESPACE);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, 
streamName), EVENTHUB_KEY_NAME);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, 
streamName), EVENTHUB_KEY);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, 
streamName), MOCK_ENTITY_1);
     MapConfig config = new MapConfig(configMap);
 
     MockEventHubClientManagerFactory eventHubClientWrapperFactory = new 
MockEventHubClientManagerFactory(eventData);
@@ -173,10 +173,10 @@ public class TestEventHubSystemConsumer {
     // Set configs
     Map<String, String> configMap = new HashMap<>();
     configMap.put(String.format(EventHubConfig.CONFIG_STREAM_LIST, 
systemName), streamName);
-    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, 
systemName, streamName), EVENTHUB_NAMESPACE);
-    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, 
systemName, streamName), EVENTHUB_KEY_NAME);
-    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, 
systemName, streamName), EVENTHUB_KEY);
-    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, 
systemName, streamName), MOCK_ENTITY_1);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, 
streamName), EVENTHUB_NAMESPACE);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, 
streamName), EVENTHUB_KEY_NAME);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, 
streamName), EVENTHUB_KEY);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, 
streamName), MOCK_ENTITY_1);
     MapConfig config = new MapConfig(configMap);
 
     MockEventHubClientManagerFactory eventHubClientWrapperFactory = new 
MockEventHubClientManagerFactory(eventData);
@@ -225,10 +225,10 @@ public class TestEventHubSystemConsumer {
     // Set configs
     Map<String, String> configMap = new HashMap<>();
     configMap.put(String.format(EventHubConfig.CONFIG_STREAM_LIST, 
systemName), streamName);
-    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, 
systemName, streamName), MOCK_ENTITY_1);
-    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, 
systemName, streamName), EVENTHUB_NAMESPACE);
-    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, 
systemName, streamName), EVENTHUB_KEY_NAME);
-    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, 
systemName, streamName), EVENTHUB_KEY);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, 
streamName), MOCK_ENTITY_1);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, 
streamName), EVENTHUB_NAMESPACE);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, 
streamName), EVENTHUB_KEY_NAME);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, 
streamName), EVENTHUB_KEY);
     MapConfig config = new MapConfig(configMap);
 
     MockEventHubClientManagerFactory eventHubClientWrapperFactory = new 
MockEventHubClientManagerFactory(eventData);
@@ -282,14 +282,14 @@ public class TestEventHubSystemConsumer {
     Map<String, String> configMap = new HashMap<>();
     configMap.put(String.format(EventHubConfig.CONFIG_STREAM_LIST, systemName),
             String.format("%s,%s", streamName1, streamName2));
-    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, 
systemName, streamName1), MOCK_ENTITY_1);
-    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, 
systemName, streamName1), EVENTHUB_NAMESPACE);
-    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, 
systemName, streamName1), EVENTHUB_KEY_NAME);
-    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, 
systemName, streamName1), EVENTHUB_KEY);
-    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, 
systemName, streamName2), MOCK_ENTITY_2);
-    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, 
systemName, streamName2), EVENTHUB_NAMESPACE);
-    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, 
systemName, streamName2), EVENTHUB_KEY_NAME);
-    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, 
systemName, streamName2), EVENTHUB_KEY);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, 
streamName1), MOCK_ENTITY_1);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, 
streamName1), EVENTHUB_NAMESPACE);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, 
streamName1), EVENTHUB_KEY_NAME);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, 
streamName1), EVENTHUB_KEY);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, 
streamName2), MOCK_ENTITY_2);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, 
streamName2), EVENTHUB_NAMESPACE);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, 
streamName2), EVENTHUB_KEY_NAME);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, 
streamName2), EVENTHUB_KEY);
     MapConfig config = new MapConfig(configMap);
 
     MockEventHubClientManagerFactory eventHubClientWrapperFactory = new 
MockEventHubClientManagerFactory(eventData);

http://git-wip-us.apache.org/repos/asf/samza/blob/272aa32e/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/TestEventHubSystemProducer.java
----------------------------------------------------------------------
diff --git 
a/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/TestEventHubSystemProducer.java
 
b/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/TestEventHubSystemProducer.java
index ef73775..8572e95 100644
--- 
a/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/TestEventHubSystemProducer.java
+++ 
b/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/TestEventHubSystemProducer.java
@@ -74,10 +74,10 @@ public class TestEventHubSystemProducer {
     // Set configs
     Map<String, String> configMap = new HashMap<>();
     configMap.put(String.format(EventHubConfig.CONFIG_STREAM_LIST, 
systemName), streamName);
-    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, 
systemName, streamName), EVENTHUB_NAMESPACE);
-    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, 
systemName, streamName), EVENTHUB_KEY_NAME);
-    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, 
systemName, streamName), EVENTHUB_KEY);
-    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, 
systemName, streamName), EVENTHUB_ENTITY1);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, 
streamName), EVENTHUB_NAMESPACE);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, 
streamName), EVENTHUB_KEY_NAME);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, 
streamName), EVENTHUB_KEY);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, 
streamName), EVENTHUB_ENTITY1);
     
configMap.put(String.format(EventHubConfig.CONFIG_PRODUCER_PARTITION_METHOD, 
systemName),
             PartitioningMethod.PARTITION_KEY_AS_PARTITION.toString());
     MapConfig config = new MapConfig(configMap);
@@ -125,10 +125,10 @@ public class TestEventHubSystemProducer {
     // Set configs
     Map<String, String> configMap = new HashMap<>();
     configMap.put(String.format(EventHubConfig.CONFIG_STREAM_LIST, 
systemName), streamName);
-    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, 
systemName, streamName), EVENTHUB_NAMESPACE);
-    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, 
systemName, streamName), EVENTHUB_KEY_NAME);
-    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, 
systemName, streamName), EVENTHUB_KEY);
-    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, 
systemName, streamName), EVENTHUB_ENTITY1);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, 
streamName), EVENTHUB_NAMESPACE);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, 
streamName), EVENTHUB_KEY_NAME);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, 
streamName), EVENTHUB_KEY);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, 
streamName), EVENTHUB_ENTITY1);
     
configMap.put(String.format(EventHubConfig.CONFIG_PRODUCER_PARTITION_METHOD, 
systemName),
             PartitioningMethod.PARTITION_KEY_AS_PARTITION.toString());
     MapConfig config = new MapConfig(configMap);
@@ -182,10 +182,10 @@ public class TestEventHubSystemProducer {
     // Set configs
     Map<String, String> configMap = new HashMap<>();
     configMap.put(String.format(EventHubConfig.CONFIG_STREAM_LIST, 
systemName), streamName);
-    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, 
systemName, streamName), EVENTHUB_NAMESPACE);
-    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, 
systemName, streamName), EVENTHUB_KEY_NAME);
-    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, 
systemName, streamName), EVENTHUB_KEY);
-    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, 
systemName, streamName), EVENTHUB_ENTITY1);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, 
streamName), EVENTHUB_NAMESPACE);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, 
streamName), EVENTHUB_KEY_NAME);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, 
streamName), EVENTHUB_KEY);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, 
streamName), EVENTHUB_ENTITY1);
 
     // mod 2 on the partitionid to simulate consistent hashing
     
configMap.put(String.format(EventHubConfig.CONFIG_PRODUCER_PARTITION_METHOD, 
systemName),

Reply via email to