Repository: samza Updated Branches: refs/heads/master 956cf412a -> 272aa32e3
Making event hub configs Samza compliant Fixes for Bugs - SAMZA-1571 Make Eventhubs system configs compatible with Samza standalone. - SAMZA-1624 EventHub system should prefix the configs with senstive for SasKey and SasToken - SAMZA-1625 EventHub systemAdmin is swallowing exceptions - SAMZA-1626 EventHub system admin is not returning the metadata for all the ssps requested for Description 1. Right now event hub doesn't follow the samza's config convention of naming the secrets as "sensitive" so that they are masked before they are logged. 2. Event hub configs uses the old system.<systemName>.streams.<streamName> which is blacklisted in Samza standalone. So moving these configs to newer <streams>.<streamid> 3. Wrapping the underlying exception properly in the SamzaException in EventHubSystemAdmin 4. Porting Bharat's fix to return the metadata for all the ssps requested for in EventHubSystemAdmin Author: Srinivasulu Punuru <[email protected]> Reviewers: Xinyu Liu <[email protected]> Closes #453 from srinipunuru/eh.1 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/272aa32e Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/272aa32e Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/272aa32e Branch: refs/heads/master Commit: 272aa32e3045a7eb4b429e7790931a914317eef3 Parents: 956cf41 Author: Srinivasulu Punuru <[email protected]> Authored: Thu Mar 22 17:08:11 2018 -0700 Committer: xiliu <[email protected]> Committed: Thu Mar 22 17:08:11 2018 -0700 ---------------------------------------------------------------------- .../samza/system/eventhub/EventHubConfig.java | 30 ++++++------ .../eventhub/admin/EventHubSystemAdmin.java | 10 ++-- .../eventhub/MockEventHubConfigFactory.java | 18 ++++---- .../consumer/TestEventHubSystemConsumer.java | 48 ++++++++++---------- .../producer/TestEventHubSystemProducer.java | 24 +++++----- 5 files changed, 66 insertions(+), 64 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/272aa32e/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java index 5d83911..e9c383a 100644 --- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java +++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java @@ -36,15 +36,15 @@ public class EventHubConfig extends MapConfig { public static final String CONFIG_STREAM_LIST = "systems.%s.stream.list"; - public static final String CONFIG_STREAM_NAMESPACE = "systems.%s.streams.%s.eventhubs.namespace"; + public static final String CONFIG_STREAM_NAMESPACE = "streams.%s.eventhubs.namespace"; - public static final String CONFIG_STREAM_ENTITYPATH = "systems.%s.streams.%s.eventhubs.entitypath"; + public static final String CONFIG_STREAM_ENTITYPATH = "streams.%s.eventhubs.entitypath"; - public static final String CONFIG_STREAM_SAS_KEY_NAME = "systems.%s.streams.%s.eventhubs.sas.keyname"; + public static final String CONFIG_STREAM_SAS_KEY_NAME = Config.SENSITIVE_PREFIX + "streams.%s.eventhubs.sas.keyname"; - public static final String CONFIG_STREAM_SAS_TOKEN = "systems.%s.streams.%s.eventhubs.sas.token"; + public static final String CONFIG_STREAM_SAS_TOKEN = Config.SENSITIVE_PREFIX + "streams.%s.eventhubs.sas.token"; - public static final String CONFIG_STREAM_CONSUMER_GROUP = "systems.%s.streams.%s.eventhubs.consumer.group"; + public static final String CONFIG_STREAM_CONSUMER_GROUP = "streams.%s.eventhubs.consumer.group"; public static final String DEFAULT_CONFIG_STREAM_CONSUMER_GROUP = EventHubClient.DEFAULT_CONSUMER_GROUP_NAME; public static final String CONFIG_PRODUCER_PARTITION_METHOD = "systems.%s.eventhubs.partition.method"; @@ -71,18 +71,18 @@ public class EventHubConfig extends MapConfig { .forEach((streamId) -> physcialToId.put(streamConfig.getPhysicalName(streamId), streamId)); } - private String getFromStreamIdOrName(String configName, String systemName, String streamName, String defaultString) { - String result = getFromStreamIdOrName(configName, systemName, streamName); + private String getFromStreamIdOrName(String configName, String streamName, String defaultString) { + String result = getFromStreamIdOrName(configName, streamName); if (result == null) { return defaultString; } return result; } - private String getFromStreamIdOrName(String configName, String systemName, String streamName) { + private String getFromStreamIdOrName(String configName, String streamName) { String streamId = getStreamId(streamName); - return get(String.format(configName, systemName, streamId), - streamId.equals(streamName) ? null : get(String.format(configName, systemName, streamName))); + return get(String.format(configName, streamId), + streamId.equals(streamName) ? null : get(String.format(configName, streamName))); } private String validateRequiredConfig(String value, String fieldName, String systemName, String streamName) { @@ -122,7 +122,7 @@ public class EventHubConfig extends MapConfig { * @return EventHubs namespace */ public String getStreamNamespace(String systemName, String streamName) { - return validateRequiredConfig(getFromStreamIdOrName(CONFIG_STREAM_NAMESPACE, systemName, streamName), + return validateRequiredConfig(getFromStreamIdOrName(CONFIG_STREAM_NAMESPACE, streamName), "Namespace", systemName, streamName); } @@ -134,7 +134,7 @@ public class EventHubConfig extends MapConfig { * @return EventHubs entity path */ public String getStreamEntityPath(String systemName, String streamName) { - return validateRequiredConfig(getFromStreamIdOrName(CONFIG_STREAM_ENTITYPATH, systemName, streamName), + return validateRequiredConfig(getFromStreamIdOrName(CONFIG_STREAM_ENTITYPATH, streamName), "EntityPath", systemName, streamName); } @@ -146,7 +146,7 @@ public class EventHubConfig extends MapConfig { * @return EventHubs SAS key name */ public String getStreamSasKeyName(String systemName, String streamName) { - return validateRequiredConfig(getFromStreamIdOrName(CONFIG_STREAM_SAS_KEY_NAME, systemName, streamName), + return validateRequiredConfig(getFromStreamIdOrName(CONFIG_STREAM_SAS_KEY_NAME, streamName), "SASKeyName", systemName, streamName); } @@ -158,7 +158,7 @@ public class EventHubConfig extends MapConfig { * @return EventHubs SAS token */ public String getStreamSasToken(String systemName, String streamName) { - return validateRequiredConfig(getFromStreamIdOrName(CONFIG_STREAM_SAS_TOKEN, systemName, streamName), + return validateRequiredConfig(getFromStreamIdOrName(CONFIG_STREAM_SAS_TOKEN, streamName), "SASToken", systemName, streamName); } @@ -170,7 +170,7 @@ public class EventHubConfig extends MapConfig { * @return EventHubs consumer group */ public String getStreamConsumerGroup(String systemName, String streamName) { - return getFromStreamIdOrName(CONFIG_STREAM_CONSUMER_GROUP, systemName, streamName, DEFAULT_CONFIG_STREAM_CONSUMER_GROUP); + return getFromStreamIdOrName(CONFIG_STREAM_CONSUMER_GROUP, streamName, DEFAULT_CONFIG_STREAM_CONSUMER_GROUP); } /** http://git-wip-us.apache.org/repos/asf/samza/blob/272aa32e/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java index 91d504c..5564747 100644 --- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java +++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java @@ -104,9 +104,10 @@ public class EventHubSystemAdmin implements SystemAdmin { streamPartitions.put(streamName, ehInfo.getPartitionIds()); } catch (InterruptedException | ExecutionException | TimeoutException e) { - String msg = String.format("Error while fetching EventHubRuntimeInfo for System:%s, Stream:%s", - systemName, streamName); - throw new SamzaException(msg); + String msg = String.format("Error while fetching EventHubRuntimeInfo for System:%s, Stream:%s", systemName, + streamName); + LOG.error(msg, e); + throw new SamzaException(msg, e); } } @@ -172,7 +173,8 @@ public class EventHubSystemAdmin implements SystemAdmin { String msg = String.format( "Error while fetching EventHubPartitionRuntimeInfo for System:%s, Stream:%s, Partition:%s", systemName, streamName, partitionId); - throw new SamzaException(msg); + LOG.error(msg, e); + throw new SamzaException(msg, e); } }); return sspMetadataMap; http://git-wip-us.apache.org/repos/asf/samza/blob/272aa32e/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventHubConfigFactory.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventHubConfigFactory.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventHubConfigFactory.java index 1d8e0ce..0f512ac 100644 --- a/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventHubConfigFactory.java +++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventHubConfigFactory.java @@ -46,15 +46,15 @@ public class MockEventHubConfigFactory { mapConfig.put(String.format(EventHubConfig.CONFIG_PRODUCER_PARTITION_METHOD, SYSTEM_NAME), partitioningMethod.toString()); mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_LIST, SYSTEM_NAME), STREAM_NAME1 + "," + STREAM_NAME2); - mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, SYSTEM_NAME, STREAM_NAME1), EVENTHUB_NAMESPACE); - mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, SYSTEM_NAME, STREAM_NAME1), EVENTHUB_ENTITY1); - mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, SYSTEM_NAME, STREAM_NAME1), EVENTHUB_KEY_NAME); - mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, SYSTEM_NAME, STREAM_NAME1), EVENTHUB_KEY); - - mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, SYSTEM_NAME, STREAM_NAME2), EVENTHUB_NAMESPACE); - mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, SYSTEM_NAME, STREAM_NAME2), EVENTHUB_ENTITY2); - mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, SYSTEM_NAME, STREAM_NAME2), EVENTHUB_KEY_NAME); - mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, SYSTEM_NAME, STREAM_NAME2), EVENTHUB_KEY); + mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, STREAM_NAME1), EVENTHUB_NAMESPACE); + mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, STREAM_NAME1), EVENTHUB_ENTITY1); + mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, STREAM_NAME1), EVENTHUB_KEY_NAME); + mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, STREAM_NAME1), EVENTHUB_KEY); + + mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, STREAM_NAME2), EVENTHUB_NAMESPACE); + mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, STREAM_NAME2), EVENTHUB_ENTITY2); + mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, STREAM_NAME2), EVENTHUB_KEY_NAME); + mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, STREAM_NAME2), EVENTHUB_KEY); return new MapConfig(mapConfig); } http://git-wip-us.apache.org/repos/asf/samza/blob/272aa32e/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/TestEventHubSystemConsumer.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/TestEventHubSystemConsumer.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/TestEventHubSystemConsumer.java index 865a248..4fced77 100644 --- a/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/TestEventHubSystemConsumer.java +++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/TestEventHubSystemConsumer.java @@ -83,10 +83,10 @@ public class TestEventHubSystemConsumer { // Set configs Map<String, String> configMap = new HashMap<>(); configMap.put(String.format(EventHubConfig.CONFIG_STREAM_LIST, systemName), streamName); - configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, systemName, streamName), EVENTHUB_NAMESPACE); - configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, systemName, streamName), EVENTHUB_KEY_NAME); - configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, systemName, streamName), EVENTHUB_KEY); - configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, systemName, streamName), MOCK_ENTITY_1); + configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, streamName), EVENTHUB_NAMESPACE); + configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, streamName), EVENTHUB_KEY_NAME); + configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, streamName), EVENTHUB_KEY); + configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, streamName), MOCK_ENTITY_1); MapConfig config = new MapConfig(configMap); MockEventHubClientManagerFactory eventHubClientWrapperFactory = new MockEventHubClientManagerFactory(eventData); @@ -123,10 +123,10 @@ public class TestEventHubSystemConsumer { // Set configs Map<String, String> configMap = new HashMap<>(); configMap.put(String.format(EventHubConfig.CONFIG_STREAM_LIST, systemName), streamName); - configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, systemName, streamName), EVENTHUB_NAMESPACE); - configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, systemName, streamName), EVENTHUB_KEY_NAME); - configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, systemName, streamName), EVENTHUB_KEY); - configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, systemName, streamName), MOCK_ENTITY_1); + configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, streamName), EVENTHUB_NAMESPACE); + configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, streamName), EVENTHUB_KEY_NAME); + configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, streamName), EVENTHUB_KEY); + configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, streamName), MOCK_ENTITY_1); MapConfig config = new MapConfig(configMap); MockEventHubClientManagerFactory eventHubClientWrapperFactory = new MockEventHubClientManagerFactory(eventData); @@ -173,10 +173,10 @@ public class TestEventHubSystemConsumer { // Set configs Map<String, String> configMap = new HashMap<>(); configMap.put(String.format(EventHubConfig.CONFIG_STREAM_LIST, systemName), streamName); - configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, systemName, streamName), EVENTHUB_NAMESPACE); - configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, systemName, streamName), EVENTHUB_KEY_NAME); - configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, systemName, streamName), EVENTHUB_KEY); - configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, systemName, streamName), MOCK_ENTITY_1); + configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, streamName), EVENTHUB_NAMESPACE); + configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, streamName), EVENTHUB_KEY_NAME); + configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, streamName), EVENTHUB_KEY); + configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, streamName), MOCK_ENTITY_1); MapConfig config = new MapConfig(configMap); MockEventHubClientManagerFactory eventHubClientWrapperFactory = new MockEventHubClientManagerFactory(eventData); @@ -225,10 +225,10 @@ public class TestEventHubSystemConsumer { // Set configs Map<String, String> configMap = new HashMap<>(); configMap.put(String.format(EventHubConfig.CONFIG_STREAM_LIST, systemName), streamName); - configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, systemName, streamName), MOCK_ENTITY_1); - configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, systemName, streamName), EVENTHUB_NAMESPACE); - configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, systemName, streamName), EVENTHUB_KEY_NAME); - configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, systemName, streamName), EVENTHUB_KEY); + configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, streamName), MOCK_ENTITY_1); + configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, streamName), EVENTHUB_NAMESPACE); + configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, streamName), EVENTHUB_KEY_NAME); + configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, streamName), EVENTHUB_KEY); MapConfig config = new MapConfig(configMap); MockEventHubClientManagerFactory eventHubClientWrapperFactory = new MockEventHubClientManagerFactory(eventData); @@ -282,14 +282,14 @@ public class TestEventHubSystemConsumer { Map<String, String> configMap = new HashMap<>(); configMap.put(String.format(EventHubConfig.CONFIG_STREAM_LIST, systemName), String.format("%s,%s", streamName1, streamName2)); - configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, systemName, streamName1), MOCK_ENTITY_1); - configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, systemName, streamName1), EVENTHUB_NAMESPACE); - configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, systemName, streamName1), EVENTHUB_KEY_NAME); - configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, systemName, streamName1), EVENTHUB_KEY); - configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, systemName, streamName2), MOCK_ENTITY_2); - configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, systemName, streamName2), EVENTHUB_NAMESPACE); - configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, systemName, streamName2), EVENTHUB_KEY_NAME); - configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, systemName, streamName2), EVENTHUB_KEY); + configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, streamName1), MOCK_ENTITY_1); + configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, streamName1), EVENTHUB_NAMESPACE); + configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, streamName1), EVENTHUB_KEY_NAME); + configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, streamName1), EVENTHUB_KEY); + configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, streamName2), MOCK_ENTITY_2); + configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, streamName2), EVENTHUB_NAMESPACE); + configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, streamName2), EVENTHUB_KEY_NAME); + configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, streamName2), EVENTHUB_KEY); MapConfig config = new MapConfig(configMap); MockEventHubClientManagerFactory eventHubClientWrapperFactory = new MockEventHubClientManagerFactory(eventData); http://git-wip-us.apache.org/repos/asf/samza/blob/272aa32e/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/TestEventHubSystemProducer.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/TestEventHubSystemProducer.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/TestEventHubSystemProducer.java index ef73775..8572e95 100644 --- a/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/TestEventHubSystemProducer.java +++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/TestEventHubSystemProducer.java @@ -74,10 +74,10 @@ public class TestEventHubSystemProducer { // Set configs Map<String, String> configMap = new HashMap<>(); configMap.put(String.format(EventHubConfig.CONFIG_STREAM_LIST, systemName), streamName); - configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, systemName, streamName), EVENTHUB_NAMESPACE); - configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, systemName, streamName), EVENTHUB_KEY_NAME); - configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, systemName, streamName), EVENTHUB_KEY); - configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, systemName, streamName), EVENTHUB_ENTITY1); + configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, streamName), EVENTHUB_NAMESPACE); + configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, streamName), EVENTHUB_KEY_NAME); + configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, streamName), EVENTHUB_KEY); + configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, streamName), EVENTHUB_ENTITY1); configMap.put(String.format(EventHubConfig.CONFIG_PRODUCER_PARTITION_METHOD, systemName), PartitioningMethod.PARTITION_KEY_AS_PARTITION.toString()); MapConfig config = new MapConfig(configMap); @@ -125,10 +125,10 @@ public class TestEventHubSystemProducer { // Set configs Map<String, String> configMap = new HashMap<>(); configMap.put(String.format(EventHubConfig.CONFIG_STREAM_LIST, systemName), streamName); - configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, systemName, streamName), EVENTHUB_NAMESPACE); - configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, systemName, streamName), EVENTHUB_KEY_NAME); - configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, systemName, streamName), EVENTHUB_KEY); - configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, systemName, streamName), EVENTHUB_ENTITY1); + configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, streamName), EVENTHUB_NAMESPACE); + configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, streamName), EVENTHUB_KEY_NAME); + configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, streamName), EVENTHUB_KEY); + configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, streamName), EVENTHUB_ENTITY1); configMap.put(String.format(EventHubConfig.CONFIG_PRODUCER_PARTITION_METHOD, systemName), PartitioningMethod.PARTITION_KEY_AS_PARTITION.toString()); MapConfig config = new MapConfig(configMap); @@ -182,10 +182,10 @@ public class TestEventHubSystemProducer { // Set configs Map<String, String> configMap = new HashMap<>(); configMap.put(String.format(EventHubConfig.CONFIG_STREAM_LIST, systemName), streamName); - configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, systemName, streamName), EVENTHUB_NAMESPACE); - configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, systemName, streamName), EVENTHUB_KEY_NAME); - configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, systemName, streamName), EVENTHUB_KEY); - configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, systemName, streamName), EVENTHUB_ENTITY1); + configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, streamName), EVENTHUB_NAMESPACE); + configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, streamName), EVENTHUB_KEY_NAME); + configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, streamName), EVENTHUB_KEY); + configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, streamName), EVENTHUB_ENTITY1); // mod 2 on the partitionid to simulate consistent hashing configMap.put(String.format(EventHubConfig.CONFIG_PRODUCER_PARTITION_METHOD, systemName),
