Repository: samza
Updated Branches:
  refs/heads/master e74998c5e -> 1d420e750


SAMZA-1532; Eventhub connector fix

Key fixes vjagadish1989 lhaiesp srinipunuru
- Switched Producer source vs destination assumptions in `send`, `register`
- Check `OME.key` if `OME.partitionId` is null for to get partitionId
- Upcoming offset changed the `END_OF_STREAM` rather than `newestOffset` + 1, 
eventHub returns an error if the offset does not exist in the system
- Made the NewestOffset+1 as upcoming offset, consumer checks if the offset is 
valid on startup
- Differentiated between streamNames and streamIds in configs, consumer, 
producer
- Checkpoint table named after job name
- Checkpoint prints better message for invalid key on write

QOL
- How to ignore integration tests
- Improved logging

EDIT:
- Also added Round Robin producer partitioning

Author: Daniel Chen <[email protected]>

Reviewers: Jagadish<[email protected]>

Closes #377 from dxichen/eventhub-connector-fix


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/1d420e75
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/1d420e75
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/1d420e75

Branch: refs/heads/master
Commit: 1d420e750eb8dc7f0715f7e11e01af1e9ce61506
Parents: e74998c
Author: Daniel Chen <[email protected]>
Authored: Mon Dec 11 16:45:44 2017 -0800
Committer: Jagadish <[email protected]>
Committed: Mon Dec 11 16:45:44 2017 -0800

----------------------------------------------------------------------
 build.gradle                                    |   5 -
 .../azure/AzureCheckpointManager.java           |  31 +++++-
 .../azure/AzureCheckpointManagerFactory.java    |   4 +-
 .../samza/system/eventhub/EventHubConfig.java   |  71 ++++++++++---
 .../eventhub/SamzaEventHubClientManager.java    |   4 +-
 .../eventhub/admin/EventHubSystemAdmin.java     |  17 +--
 .../consumer/EventHubSystemConsumer.java        |  77 ++++++++++----
 .../producer/EventHubSystemProducer.java        | 103 ++++++++++---------
 .../azure/ITestAzureCheckpointManager.java      |   2 +-
 .../MockEventHubClientManagerFactory.java       |  18 ++++
 .../eventhub/admin/TestEventHubSystemAdmin.java |  15 +--
 .../consumer/ITestEventHubSystemConsumer.java   |   5 +-
 .../consumer/TestEventHubSystemConsumer.java    |  19 ++--
 .../producer/ITestEventHubSystemProducer.java   |   2 +
 .../producer/TestEventHubSystemProducer.java    |  34 +++---
 15 files changed, 272 insertions(+), 135 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/1d420e75/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index be1baf7..330ff0b 100644
--- a/build.gradle
+++ b/build.gradle
@@ -213,11 +213,6 @@ project(':samza-azure') {
     configFile = new File(rootDir, "checkstyle/checkstyle.xml")
     toolVersion = "$checkstyleVersion"
   }
-  test {
-    // Exclude integration tests that require connection to EventHub
-    exclude 'org/apache/samza/system/eventhub/producer/*ITest*'
-    exclude 'org/apache/samza/system/eventhub/consumer/*ITest*'
-  }
 }
 
 project(':samza-aws') {

http://git-wip-us.apache.org/repos/asf/samza/blob/1d420e75/samza-azure/src/main/java/org/apache/samza/checkpoint/azure/AzureCheckpointManager.java
----------------------------------------------------------------------
diff --git 
a/samza-azure/src/main/java/org/apache/samza/checkpoint/azure/AzureCheckpointManager.java
 
b/samza-azure/src/main/java/org/apache/samza/checkpoint/azure/AzureCheckpointManager.java
index df3e490..2cad3bd 100644
--- 
a/samza-azure/src/main/java/org/apache/samza/checkpoint/azure/AzureCheckpointManager.java
+++ 
b/samza-azure/src/main/java/org/apache/samza/checkpoint/azure/AzureCheckpointManager.java
@@ -34,6 +34,7 @@ import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.system.SystemStreamPartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.Option;
 
 import java.net.URISyntaxException;
 import java.util.HashMap;
@@ -60,12 +61,16 @@ public class AzureCheckpointManager implements 
CheckpointManager {
   private static final Logger LOG = 
LoggerFactory.getLogger(AzureCheckpointManager.class.getName());
   private static final String PARTITION_KEY = "PartitionKey";
 
+  // Invalid characters in key field on Azure Table
+  public static final String REGEX_INVALID_KEY = ".*[#?/\\\\].*";
+  public static final String REGEX_TABLE_NAME = "[^A-Za-z0-9]";
+
   public static final int MAX_WRITE_BATCH_SIZE = 100;
-  public static final String CHECKPOINT_MANAGER_TABLE_NAME = 
"SamzaTaskCheckpoints";
   public static final String SYSTEM_PROP_NAME = "system";
   public static final String STREAM_PROP_NAME = "stream";
   public static final String PARTITION_PROP_NAME = "partition";
 
+  private final String jobTableName;
   private final String storageConnectionString;
   private final AzureClient azureClient;
   private CloudTable cloudTable;
@@ -73,7 +78,12 @@ public class AzureCheckpointManager implements 
CheckpointManager {
   private final Set<TaskName> taskNames = new HashSet<>();
   private final JsonSerdeV2<Map<String, String>> jsonSerde = new 
JsonSerdeV2<>();
 
-  AzureCheckpointManager(AzureConfig azureConfig) {
+  AzureCheckpointManager(AzureConfig azureConfig, Option<String> jobName) {
+    if (!jobName.isDefined()) {
+      throw new AzureException("Jobs must have a name to use Azure Checkpoint 
Manager");
+    }
+    // Remove invalid characters
+    jobTableName = jobName.get().replaceAll(REGEX_TABLE_NAME, "");
     storageConnectionString = azureConfig.getAzureConnectionString();
     azureClient = new AzureClient(storageConnectionString);
   }
@@ -82,7 +92,7 @@ public class AzureCheckpointManager implements 
CheckpointManager {
   public void start() {
     try {
       // Create the table if it doesn't exist.
-      cloudTable = 
azureClient.getTableClient().getTableReference(CHECKPOINT_MANAGER_TABLE_NAME);
+      cloudTable = 
azureClient.getTableClient().getTableReference(jobTableName);
       cloudTable.createIfNotExists();
 
     } catch (URISyntaxException e) {
@@ -115,9 +125,13 @@ public class AzureCheckpointManager implements 
CheckpointManager {
       SystemStreamPartition ssp = entry.getKey();
       String offset = entry.getValue();
 
+      String partitionKey = taskName.toString();
+      checkValidKey(partitionKey, "Taskname");
+      String rowKey = serializeSystemStreamPartition(ssp);
+      checkValidKey(rowKey, "SystemStreamPartition");
+
       // Create table entity
-      TaskCheckpointEntity taskCheckpoint = new 
TaskCheckpointEntity(taskName.toString(),
-              serializeSystemStreamPartition(ssp), offset);
+      TaskCheckpointEntity taskCheckpoint = new 
TaskCheckpointEntity(partitionKey, rowKey, offset);
 
       // Add to batch operation
       batchOperation.insertOrReplace(taskCheckpoint);
@@ -135,6 +149,13 @@ public class AzureCheckpointManager implements 
CheckpointManager {
     }
   }
 
+  private void checkValidKey(String key, String fieldUsed) {
+    if (key == null || key.matches(REGEX_INVALID_KEY)) {
+      throw new AzureException(String.format("Cannot insert to Azure 
Checkpoint Manager; %s %s contains invalid characters [*, /, \\\\, ?]",
+      fieldUsed, key));
+    }
+  }
+
   private String serializeSystemStreamPartition(SystemStreamPartition ssp) {
     // Create the Json string for SystemStreamPartition
     Map<String, String> sspMap = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/samza/blob/1d420e75/samza-azure/src/main/java/org/apache/samza/checkpoint/azure/AzureCheckpointManagerFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-azure/src/main/java/org/apache/samza/checkpoint/azure/AzureCheckpointManagerFactory.java
 
b/samza-azure/src/main/java/org/apache/samza/checkpoint/azure/AzureCheckpointManagerFactory.java
index 3c5d62a..95728e3 100644
--- 
a/samza-azure/src/main/java/org/apache/samza/checkpoint/azure/AzureCheckpointManagerFactory.java
+++ 
b/samza-azure/src/main/java/org/apache/samza/checkpoint/azure/AzureCheckpointManagerFactory.java
@@ -23,11 +23,13 @@ import org.apache.samza.checkpoint.CheckpointManager;
 import org.apache.samza.checkpoint.CheckpointManagerFactory;
 import org.apache.samza.config.AzureConfig;
 import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
 import org.apache.samza.metrics.MetricsRegistry;
 
 public class AzureCheckpointManagerFactory implements CheckpointManagerFactory 
{
   @Override
   public CheckpointManager getCheckpointManager(Config config, MetricsRegistry 
registry) {
-    return new AzureCheckpointManager(new AzureConfig(config));
+    JobConfig jobConfig = new JobConfig(config);
+    return new AzureCheckpointManager(new AzureConfig(config), 
jobConfig.getName());
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/1d420e75/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java
----------------------------------------------------------------------
diff --git 
a/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java
 
b/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java
index 3bc04f8..5d83911 100644
--- 
a/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java
+++ 
b/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java
@@ -20,10 +20,15 @@
 package org.apache.samza.system.eventhub;
 
 import com.microsoft.azure.eventhubs.EventHubClient;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.StreamConfig;
 import org.apache.samza.system.eventhub.producer.EventHubSystemProducer;
+import scala.collection.JavaConversions;
 
 import java.time.Duration;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -47,7 +52,7 @@ public class EventHubConfig extends MapConfig {
           .PartitioningMethod.EVENT_HUB_HASHING.name();
 
   public static final String CONFIG_SEND_KEY_IN_EVENT_PROPERTIES = 
"systems.%s.eventhubs.send.key";
-  public static final Boolean DEFAULT_CONFIG_SEND_KEY_IN_EVENT_PROPERTIES = 
false;
+  public static final Boolean DEFAULT_CONFIG_SEND_KEY_IN_EVENT_PROPERTIES = 
true;
 
   public static final String CONFIG_FETCH_RUNTIME_INFO_TIMEOUT_MILLIS = 
"systems.%s.eventhubs.runtime.info.timeout";
   public static final long DEFAULT_CONFIG_FETCH_RUNTIME_INFO_TIMEOUT_MILLIS = 
Duration.ofMinutes(1L).toMillis();
@@ -55,9 +60,47 @@ public class EventHubConfig extends MapConfig {
   public static final String CONFIG_CONSUMER_BUFFER_CAPACITY = 
"systems.%s.eventhubs.receive.queue.size";
   public static final int DEFAULT_CONFIG_CONSUMER_BUFFER_CAPACITY = 100;
 
+  private final Map<String, String> physcialToId = new HashMap<>();
 
-  public EventHubConfig(Map<String, String> config) {
+  public EventHubConfig(Config config) {
     super(config);
+
+    // Build reverse index for streamName -> streamId
+    StreamConfig streamConfig = new StreamConfig(config);
+    JavaConversions.asJavaCollection(streamConfig.getStreamIds())
+            .forEach((streamId) -> 
physcialToId.put(streamConfig.getPhysicalName(streamId), streamId));
+  }
+
+  private String getFromStreamIdOrName(String configName, String systemName, 
String streamName, String defaultString) {
+    String result = getFromStreamIdOrName(configName, systemName, streamName);
+    if (result == null) {
+      return defaultString;
+    }
+    return result;
+  }
+
+  private String getFromStreamIdOrName(String configName, String systemName, 
String streamName) {
+    String streamId = getStreamId(streamName);
+    return get(String.format(configName, systemName, streamId),
+            streamId.equals(streamName) ? null : get(String.format(configName, 
systemName, streamName)));
+  }
+
+  private String validateRequiredConfig(String value, String fieldName, String 
systemName, String streamName) {
+    if (value == null) {
+      throw new SamzaException(String.format("Missing %s configuration for 
system: %s, stream: %s",
+              fieldName, systemName, streamName));
+    }
+    return value;
+  }
+
+  /**
+   * Get the streamId for the specified streamName
+   *
+   * @param streamName the physical identifier of a stream
+   * @return the streamId identifier for the stream or the queried streamName 
if it is not found.
+   */
+  public String getStreamId(String streamName) {
+    return physcialToId.getOrDefault(streamName, streamName);
   }
 
   /**
@@ -75,55 +118,59 @@ public class EventHubConfig extends MapConfig {
    * Get the EventHubs namespace for the stream
    *
    * @param systemName name of the system
-   * @param streamName name of stream
+   * @param streamName name of stream (physical or streamId)
    * @return EventHubs namespace
    */
   public String getStreamNamespace(String systemName, String streamName) {
-    return get(String.format(CONFIG_STREAM_NAMESPACE, systemName, streamName));
+    return 
validateRequiredConfig(getFromStreamIdOrName(CONFIG_STREAM_NAMESPACE, 
systemName, streamName),
+            "Namespace", systemName, streamName);
   }
 
   /**
    * Get the EventHubs entity path (topic name) for the stream
    *
    * @param systemName name of the system
-   * @param streamName name of stream
+   * @param streamName name of stream (physical or streamId)
    * @return EventHubs entity path
    */
   public String getStreamEntityPath(String systemName, String streamName) {
-    return get(String.format(CONFIG_STREAM_ENTITYPATH, systemName, 
streamName));
+    return 
validateRequiredConfig(getFromStreamIdOrName(CONFIG_STREAM_ENTITYPATH, 
systemName, streamName),
+            "EntityPath", systemName, streamName);
   }
 
   /**
    * Get the EventHubs SAS (Shared Access Signature) key name for the stream
    *
    * @param systemName name of the system
-   * @param streamName name of stream
+   * @param streamName name of stream (physical or streamId)
    * @return EventHubs SAS key name
    */
   public String getStreamSasKeyName(String systemName, String streamName) {
-    return get(String.format(CONFIG_STREAM_SAS_KEY_NAME, systemName, 
streamName));
+    return 
validateRequiredConfig(getFromStreamIdOrName(CONFIG_STREAM_SAS_KEY_NAME, 
systemName, streamName),
+            "SASKeyName", systemName, streamName);
   }
 
   /**
    * Get the EventHubs SAS (Shared Access Signature) token for the stream
    *
    * @param systemName name of the system
-   * @param streamName name of stream
+   * @param streamName name of stream (physical or streamId)
    * @return EventHubs SAS token
    */
   public String getStreamSasToken(String systemName, String streamName) {
-    return get(String.format(CONFIG_STREAM_SAS_TOKEN, systemName, streamName));
+    return 
validateRequiredConfig(getFromStreamIdOrName(CONFIG_STREAM_SAS_TOKEN, 
systemName, streamName),
+            "SASToken", systemName, streamName);
   }
 
   /**
    * Get the EventHubs consumer group used for consumption for the stream
    *
    * @param systemName name of the system
-   * @param streamName name of stream
+   * @param streamName name of stream (physical or streamId)
    * @return EventHubs consumer group
    */
   public String getStreamConsumerGroup(String systemName, String streamName) {
-    return get(String.format(CONFIG_STREAM_CONSUMER_GROUP, systemName, 
streamName), DEFAULT_CONFIG_STREAM_CONSUMER_GROUP);
+    return getFromStreamIdOrName(CONFIG_STREAM_CONSUMER_GROUP, systemName, 
streamName, DEFAULT_CONFIG_STREAM_CONSUMER_GROUP);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/1d420e75/samza-azure/src/main/java/org/apache/samza/system/eventhub/SamzaEventHubClientManager.java
----------------------------------------------------------------------
diff --git 
a/samza-azure/src/main/java/org/apache/samza/system/eventhub/SamzaEventHubClientManager.java
 
b/samza-azure/src/main/java/org/apache/samza/system/eventhub/SamzaEventHubClientManager.java
index ada5694..977e252 100644
--- 
a/samza-azure/src/main/java/org/apache/samza/system/eventhub/SamzaEventHubClientManager.java
+++ 
b/samza-azure/src/main/java/org/apache/samza/system/eventhub/SamzaEventHubClientManager.java
@@ -75,8 +75,8 @@ public class SamzaEventHubClientManager implements 
EventHubClientManager {
 
       eventHubClient = 
EventHubClient.createFromConnectionStringSync(connectionStringBuilder.toString(),
 retryPolicy);
     } catch (IOException | ServiceBusException e) {
-      String msg = String.format("Creation of EventHub client failed for 
eventHub %s %s %s %s on remote host %s:%d",
-              entityPath, eventHubNamespace, sasKeyName, sasKey, remoteHost, 
ClientConstants.AMQPS_PORT);
+      String msg = String.format("Creation of EventHub client failed for 
eventHub EntityPath: %s on remote host %s:%d",
+              entityPath, remoteHost, ClientConstants.AMQPS_PORT);
       LOG.error(msg, e);
       throw new SamzaException(msg, e);
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/1d420e75/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java
----------------------------------------------------------------------
diff --git 
a/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java
 
b/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java
index 11998a4..91d504c 100644
--- 
a/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java
+++ 
b/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java
@@ -63,8 +63,8 @@ public class EventHubSystemAdmin implements SystemAdmin {
   private String getNextOffset(String currentOffset) {
     // EventHub will return the first message AFTER the offset
     // that was specified in the fetch request.
-    return currentOffset.equals(EventHubSystemConsumer.END_OF_STREAM) ? 
currentOffset :
-            String.valueOf(Long.parseLong(currentOffset) + 1);
+    // If no such offset exists Eventhub will return an error.
+    return String.valueOf(Long.parseLong(currentOffset) + 1);
   }
 
   @Override
@@ -158,6 +158,7 @@ public class EventHubSystemAdmin implements SystemAdmin {
           long timeoutMs = eventHubConfig.getRuntimeInfoWaitTimeMS(systemName);
           EventHubPartitionRuntimeInformation ehPartitionInfo = 
ehPartitionRuntimeInfo.get(timeoutMs, TimeUnit.MILLISECONDS);
 
+          // Set offsets
           String startingOffset = EventHubSystemConsumer.START_OF_STREAM;
           String newestOffset = ehPartitionInfo.getLastEnqueuedOffset();
           String upcomingOffset = getNextOffset(newestOffset);
@@ -181,12 +182,14 @@ public class EventHubSystemAdmin implements SystemAdmin {
     if (offset1 == null || offset2 == null) {
       return null;
     }
+    // Should NOT be able to compare with END_OF_STREAM to allow new offsets 
to be
+    // considered caught up if stream started at END_OF_STREAM offset
+    if (EventHubSystemConsumer.END_OF_STREAM.equals(offset1) ||
+            EventHubSystemConsumer.END_OF_STREAM.equals(offset2)) {
+      return null;
+    }
     try {
-      if (offset1.equals(EventHubSystemConsumer.END_OF_STREAM)) {
-        return offset2.equals(EventHubSystemConsumer.END_OF_STREAM) ? 0 : 1;
-      }
-      return offset2.equals(EventHubSystemConsumer.END_OF_STREAM) ? -1 :
-              Long.compare(Long.parseLong(offset1), Long.parseLong(offset2));
+      return Long.compare(Long.parseLong(offset1), Long.parseLong(offset2));
     } catch (NumberFormatException exception) {
       return null;
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/1d420e75/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
----------------------------------------------------------------------
diff --git 
a/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
 
b/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
index 4de34de..90c73dc 100644
--- 
a/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
+++ 
b/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
@@ -20,6 +20,7 @@
 package org.apache.samza.system.eventhub.consumer;
 
 import com.microsoft.azure.eventhubs.EventData;
+import com.microsoft.azure.eventhubs.EventHubPartitionRuntimeInformation;
 import com.microsoft.azure.eventhubs.PartitionReceiveHandler;
 import com.microsoft.azure.eventhubs.PartitionReceiver;
 import com.microsoft.azure.servicebus.ServiceBusException;
@@ -34,6 +35,7 @@ import org.apache.samza.system.eventhub.EventHubConfig;
 import org.apache.samza.system.eventhub.Interceptor;
 import org.apache.samza.system.eventhub.admin.EventHubSystemAdmin;
 import org.apache.samza.system.eventhub.metrics.SamzaHistogram;
+import org.apache.samza.system.eventhub.producer.EventHubSystemProducer;
 import org.apache.samza.util.BlockingEnvelopeMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -140,23 +142,23 @@ public class EventHubSystemConsumer extends 
BlockingEnvelopeMap {
     this.config = config;
     this.systemName = systemName;
     this.interceptors = interceptors;
-    List<String> streamNames = config.getStreams(systemName);
+    List<String> streamIds = config.getStreams(systemName);
     // Create and initiate connections to Event Hubs
-    for (String streamName : streamNames) {
+    for (String streamId : streamIds) {
       EventHubClientManager eventHubClientManager = 
eventHubClientManagerFactory
-              .getEventHubClientManager(systemName, streamName, config);
-      streamEventHubManagers.put(streamName, eventHubClientManager);
+              .getEventHubClientManager(systemName, streamId, config);
+      streamEventHubManagers.put(streamId, eventHubClientManager);
       eventHubClientManager.init();
     }
 
     // Initiate metrics
-    eventReadRates = streamNames.stream()
+    eventReadRates = streamIds.stream()
             .collect(Collectors.toMap(Function.identity(), x -> 
registry.newCounter(x, EVENT_READ_RATE)));
-    eventByteReadRates = streamNames.stream()
+    eventByteReadRates = streamIds.stream()
             .collect(Collectors.toMap(Function.identity(), x -> 
registry.newCounter(x, EVENT_BYTE_READ_RATE)));
-    readLatencies = streamNames.stream()
+    readLatencies = streamIds.stream()
             .collect(Collectors.toMap(Function.identity(), x -> new 
SamzaHistogram(registry, x, READ_LATENCY)));
-    readErrors = streamNames.stream()
+    readErrors = streamIds.stream()
             .collect(Collectors.toMap(Function.identity(), x -> 
registry.newCounter(x, READ_ERRORS)));
 
     // Locking to ensure that these aggregated metrics will be created only 
once across multiple system consumers.
@@ -174,20 +176,41 @@ public class EventHubSystemConsumer extends 
BlockingEnvelopeMap {
   public void register(SystemStreamPartition systemStreamPartition, String 
offset) {
     super.register(systemStreamPartition, offset);
 
+    LOG.info(String.format("Eventhub consumer trying to register ssp %s, 
offset %s", systemStreamPartition, offset));
     if (isStarted) {
       throw new SamzaException("Trying to add partition when the connection 
has already started.");
     }
 
     if (streamPartitionOffsets.containsKey(systemStreamPartition)) {
+      // Only update if new offset is lower than previous offset
+      if (END_OF_STREAM.equals(offset)) return;
       String prevOffset = streamPartitionOffsets.get(systemStreamPartition);
-      if (EventHubSystemAdmin.compareOffsets(offset, prevOffset) > -1) {
-        // Only update if new offset is lower than previous offset
+      if (!END_OF_STREAM.equals(prevOffset) && 
EventHubSystemAdmin.compareOffsets(offset, prevOffset) > -1) {
         return;
       }
     }
     streamPartitionOffsets.put(systemStreamPartition, offset);
   }
 
+  private String getNewestEventHubOffset(EventHubClientManager 
eventHubClientManager, String streamName, Integer partitionId) {
+    CompletableFuture<EventHubPartitionRuntimeInformation> 
partitionRuntimeInfoFuture = eventHubClientManager
+            .getEventHubClient()
+            .getPartitionRuntimeInformation(partitionId.toString());
+    try {
+      long timeoutMs = config.getRuntimeInfoWaitTimeMS(systemName);
+
+      EventHubPartitionRuntimeInformation partitionRuntimeInformation = 
partitionRuntimeInfoFuture
+              .get(timeoutMs, TimeUnit.MILLISECONDS);
+
+      return partitionRuntimeInformation.getLastEnqueuedOffset();
+    } catch (InterruptedException | ExecutionException | TimeoutException e) {
+      String msg = String.format(
+              "Error while fetching EventHubPartitionRuntimeInfo for 
System:%s, Stream:%s, Partition:%s",
+              systemName, streamName, partitionId);
+      throw new SamzaException(msg);
+    }
+  }
+
   @Override
   public void start() {
     isStarted = true;
@@ -196,27 +219,36 @@ public class EventHubSystemConsumer extends 
BlockingEnvelopeMap {
 
       SystemStreamPartition ssp = entry.getKey();
       String streamName = ssp.getStream();
+      String streamId = config.getStreamId(ssp.getStream());
       Integer partitionId = ssp.getPartition().getPartitionId();
       String offset = entry.getValue();
-      String consumerGroup = config.getStreamConsumerGroup(systemName, 
streamName);
-      String namespace = config.getStreamNamespace(systemName, streamName);
-      String entityPath = config.getStreamEntityPath(systemName, streamName);
-      EventHubClientManager eventHubClientManager = 
streamEventHubManagers.get(streamName);
+      String consumerGroup = config.getStreamConsumerGroup(systemName, 
streamId);
+      String namespace = config.getStreamNamespace(systemName, streamId);
+      String entityPath = config.getStreamEntityPath(systemName, streamId);
+      EventHubClientManager eventHubClientManager = 
streamEventHubManagers.get(streamId);
 
       try {
+        // Fetch the newest offset
+        String newestEventHubOffset = 
getNewestEventHubOffset(eventHubClientManager, streamName, partitionId);
         PartitionReceiver receiver;
-        if (offset.equals(EventHubSystemConsumer.END_OF_STREAM)) {
+        if (END_OF_STREAM.equals(offset) || 
EventHubSystemAdmin.compareOffsets(newestEventHubOffset, offset) == -1) {
+          // If the offset is greater than the newest offset, use the use 
current Instant as
+          // offset to fetch in Eventhub.
           receiver = eventHubClientManager.getEventHubClient()
                   .createReceiverSync(consumerGroup, partitionId.toString(), 
Instant.now());
         } else {
+          // If the offset is less or equal to the newest offset in the 
system, it can be
+          // used as the starting offset to receive from. EventHub will return 
the first
+          // message AFTER the offset that was specified in the fetch request.
+          // If no such offset exists Eventhub will return an error.
           receiver = eventHubClientManager.getEventHubClient()
                   .createReceiverSync(consumerGroup, partitionId.toString(), 
offset,
                           
!offset.equals(EventHubSystemConsumer.START_OF_STREAM));
         }
 
-        PartitionReceiveHandler handler = new 
PartitionReceiverHandlerImpl(ssp, eventReadRates.get(streamName),
-                eventByteReadRates.get(streamName), 
readLatencies.get(streamName), readErrors.get(streamName),
-                interceptors.getOrDefault(streamName, null));
+        PartitionReceiveHandler handler = new 
PartitionReceiverHandlerImpl(ssp, eventReadRates.get(streamId),
+                eventByteReadRates.get(streamId), readLatencies.get(streamId), 
readErrors.get(streamId),
+                interceptors.getOrDefault(streamId, null));
 
 
         // Timeout for EventHubClient receive
@@ -261,11 +293,11 @@ public class EventHubSystemConsumer extends 
BlockingEnvelopeMap {
   }
 
   private void renewPartitionReceiver(SystemStreamPartition ssp) {
-
-    EventHubClientManager eventHubClientManager = 
streamEventHubManagers.get(ssp.getStream());
+    String streamId = config.getStreamId(ssp.getStream());
+    EventHubClientManager eventHubClientManager = 
streamEventHubManagers.get(streamId);
     String offset = streamPartitionOffsets.get(ssp);
     Integer partitionId = ssp.getPartition().getPartitionId();
-    String consumerGroup = config.getStreamConsumerGroup(ssp.getSystem(), 
ssp.getStream());
+    String consumerGroup = config.getStreamConsumerGroup(ssp.getSystem(), 
streamId);
 
     try {
       // Close current receiver
@@ -346,6 +378,9 @@ public class EventHubSystemConsumer extends 
BlockingEnvelopeMap {
             }
             String offset = event.getSystemProperties().getOffset();
             Object partitionKey = 
event.getSystemProperties().getPartitionKey();
+            if (partitionKey == null) {
+              partitionKey = 
event.getProperties().get(EventHubSystemProducer.KEY);
+            }
             try {
               updateMetrics(event);
 

http://git-wip-us.apache.org/repos/asf/samza/blob/1d420e75/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java
----------------------------------------------------------------------
diff --git 
a/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java
 
b/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java
index 505421c..f294751 100644
--- 
a/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java
+++ 
b/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java
@@ -58,6 +58,7 @@ public class EventHubSystemProducer implements SystemProducer 
{
   private static final long DEFAULT_FLUSH_TIMEOUT_MILLIS = 
Duration.ofMinutes(1L).toMillis();
 
   public static final String PRODUCE_TIMESTAMP = "produce-timestamp";
+  public static final String KEY = "key";
 
   // Metrics recording
   private static final String EVENT_WRITE_RATE = "eventWriteRate";
@@ -75,8 +76,9 @@ public class EventHubSystemProducer implements SystemProducer 
{
   private static final Object AGGREGATE_METRICS_LOCK = new Object();
 
   public enum PartitioningMethod {
+    ROUND_ROBIN,
     EVENT_HUB_HASHING,
-    PARTITION_KEY_AS_PARTITION,
+    PARTITION_KEY_AS_PARTITION
   }
 
   private final HashMap<String, Counter> eventWriteRate = new HashMap<>();
@@ -85,7 +87,6 @@ public class EventHubSystemProducer implements SystemProducer 
{
   private final HashMap<String, SamzaHistogram> sendCallbackLatency = new 
HashMap<>();
   private final HashMap<String, Counter> sendErrors = new HashMap<>();
 
-  private final EventHubClientManagerFactory eventHubClientManagerFactory;
   private final EventHubConfig config;
   private final MetricsRegistry registry;
   private final PartitioningMethod partitioningMethod;
@@ -109,36 +110,35 @@ public class EventHubSystemProducer implements 
SystemProducer {
     this.registry = registry;
     this.systemName = systemName;
     this.partitioningMethod = config.getPartitioningMethod(systemName);
-    this.eventHubClientManagerFactory = eventHubClientManagerFactory;
     this.interceptors = interceptors;
+
+    // Fetches the stream ids
+    List<String> streamIds = config.getStreams(systemName);
+
+    // Create and initiate connections to Event Hubs
+    for (String streamId : streamIds) {
+      EventHubClientManager ehClient = 
eventHubClientManagerFactory.getEventHubClientManager(systemName, streamId, 
config);
+      eventHubClients.put(streamId, ehClient);
+      ehClient.init();
+    }
   }
 
   @Override
-  public synchronized void register(String streamName) {
-    LOG.debug("Trying to register {}.", streamName);
+  public synchronized void register(String source) {
     if (isStarted) {
       String msg = "Cannot register once the producer is started.";
       throw new SamzaException(msg);
     }
-
-    if (eventHubClients.containsKey(streamName)) {
-      LOG.warn("Already registered stream {}.", streamName);
-      return;
-    }
-
-    EventHubClientManager ehClient = 
eventHubClientManagerFactory.getEventHubClientManager(systemName, streamName, 
config);
-
-    ehClient.init();
-    eventHubClients.put(streamName, ehClient);
   }
 
   @Override
   public synchronized void start() {
     LOG.debug("Starting system producer.");
 
+    // Create partition senders if required
     if 
(PartitioningMethod.PARTITION_KEY_AS_PARTITION.equals(partitioningMethod)) {
       // Create all partition senders
-      eventHubClients.forEach((streamName, samzaEventHubClient) -> {
+      eventHubClients.forEach((streamId, samzaEventHubClient) -> {
           EventHubClient ehClient = samzaEventHubClient.getEventHubClient();
 
           try {
@@ -153,7 +153,7 @@ public class EventHubSystemProducer implements 
SystemProducer {
               partitionSenders.put(i, partitionSender);
             }
 
-            streamPartitionSenders.put(streamName, partitionSenders);
+            streamPartitionSenders.put(streamId, partitionSenders);
           } catch (InterruptedException | ExecutionException | 
TimeoutException e) {
             String msg = "Failed to fetch number of Event Hub partitions for 
partition sender creation";
             throw new SamzaException(msg, e);
@@ -164,15 +164,16 @@ public class EventHubSystemProducer implements 
SystemProducer {
         });
     }
 
-    for (String eventHub : eventHubClients.keySet()) {
-      eventWriteRate.put(eventHub, registry.newCounter(eventHub, 
EVENT_WRITE_RATE));
-      eventByteWriteRate.put(eventHub, registry.newCounter(eventHub, 
EVENT_BYTE_WRITE_RATE));
-      sendLatency.put(eventHub, new SamzaHistogram(registry, eventHub, 
SEND_LATENCY));
-      sendCallbackLatency.put(eventHub, new SamzaHistogram(registry, eventHub, 
SEND_CALLBACK_LATENCY));
-      sendErrors.put(eventHub, registry.newCounter(eventHub, SEND_ERRORS));
-    }
+    // Initiate metrics
+    eventHubClients.keySet().forEach((streamId) -> {
+        eventWriteRate.put(streamId, registry.newCounter(streamId, 
EVENT_WRITE_RATE));
+        eventByteWriteRate.put(streamId, registry.newCounter(streamId, 
EVENT_BYTE_WRITE_RATE));
+        sendLatency.put(streamId, new SamzaHistogram(registry, streamId, 
SEND_LATENCY));
+        sendCallbackLatency.put(streamId, new SamzaHistogram(registry, 
streamId, SEND_CALLBACK_LATENCY));
+        sendErrors.put(streamId, registry.newCounter(streamId, SEND_ERRORS));
+      });
 
-    // Locking to ensure that these aggregated metrics will be created only 
once across multiple system consumers.
+    // Locking to ensure that these aggregated metrics will be created only 
once across multiple system producers.
     synchronized (AGGREGATE_METRICS_LOCK) {
       if (aggEventWriteRate == null) {
         aggEventWriteRate = registry.newCounter(AGGREGATE, EVENT_WRITE_RATE);
@@ -187,35 +188,38 @@ public class EventHubSystemProducer implements 
SystemProducer {
   }
 
   @Override
-  public synchronized void send(String destination, OutgoingMessageEnvelope 
envelope) {
+  public synchronized void send(String source, OutgoingMessageEnvelope 
envelope) {
+    LOG.debug(String.format("Trying to send %s", envelope));
     if (!isStarted) {
       throw new SamzaException("Trying to call send before the producer is 
started.");
     }
 
-    if (!eventHubClients.containsKey(destination)) {
-      String msg = String.format("Trying to send event to a destination {%s} 
that is not registered.", destination);
+    String streamId = 
config.getStreamId(envelope.getSystemStream().getStream());
+
+    if (!eventHubClients.containsKey(streamId)) {
+      String msg = String.format("Trying to send event to a destination {%s} 
that is not registered.", streamId);
       throw new SamzaException(msg);
     }
 
     checkCallbackThrowable("Received exception on message send");
 
-    EventData eventData = createEventData(destination, envelope);
+    EventData eventData = createEventData(streamId, envelope);
     int eventDataLength =  eventData.getBytes() == null ? 0 : 
eventData.getBytes().length;
-    eventWriteRate.get(destination).inc();
+    eventWriteRate.get(streamId).inc();
     aggEventWriteRate.inc();
-    eventByteWriteRate.get(destination).inc(eventDataLength);
+    eventByteWriteRate.get(streamId).inc(eventDataLength);
     aggEventByteWriteRate.inc(eventDataLength);
-    EventHubClientManager ehClient = eventHubClients.get(destination);
+    EventHubClientManager ehClient = eventHubClients.get(streamId);
 
     long beforeSendTimeMs = System.currentTimeMillis();
 
     // Async send call
-    CompletableFuture<Void> sendResult = sendToEventHub(destination, 
eventData, getEnvelopePartitionId(envelope),
+    CompletableFuture<Void> sendResult = sendToEventHub(streamId, eventData, 
getEnvelopePartitionId(envelope),
             ehClient.getEventHubClient());
 
     long afterSendTimeMs = System.currentTimeMillis();
     long latencyMs = afterSendTimeMs - beforeSendTimeMs;
-    sendLatency.get(destination).update(latencyMs);
+    sendLatency.get(streamId).update(latencyMs);
     aggSendLatency.update(latencyMs);
 
     pendingFutures.add(sendResult);
@@ -223,32 +227,37 @@ public class EventHubSystemProducer implements 
SystemProducer {
     // Auto update the metrics and possible throwable when futures are 
complete.
     sendResult.handle((aVoid, throwable) -> {
         long callbackLatencyMs = System.currentTimeMillis() - afterSendTimeMs;
-        sendCallbackLatency.get(destination).update(callbackLatencyMs);
+        sendCallbackLatency.get(streamId).update(callbackLatencyMs);
         aggSendCallbackLatency.update(callbackLatencyMs);
         if (throwable != null) {
-          sendErrors.get(destination).inc();
+          sendErrors.get(streamId).inc();
           aggSendErrors.inc();
-          LOG.error("Send message to event hub: {} failed with exception: ", 
destination, throwable);
+          LOG.error("Send message to event hub: {} failed with exception: ", 
streamId, throwable);
           sendExceptionOnCallback.compareAndSet(null, throwable);
         }
         return aVoid;
       });
   }
 
-  private CompletableFuture<Void> sendToEventHub(String streamName, EventData 
eventData, Object partitionKey,
+  private CompletableFuture<Void> sendToEventHub(String streamId, EventData 
eventData, Object partitionKey,
                                                  EventHubClient 
eventHubClient) {
-    if (partitioningMethod == PartitioningMethod.EVENT_HUB_HASHING) {
+    if (PartitioningMethod.ROUND_ROBIN.equals(partitioningMethod)) {
+      return eventHubClient.send(eventData);
+    } else if 
(PartitioningMethod.EVENT_HUB_HASHING.equals(partitioningMethod)) {
+      if (partitionKey == null) {
+        throw new SamzaException("Partition key cannot be null for EventHub 
hashing");
+      }
       return eventHubClient.send(eventData, 
convertPartitionKeyToString(partitionKey));
-    } else if (partitioningMethod == 
PartitioningMethod.PARTITION_KEY_AS_PARTITION) {
+    } else if 
(PartitioningMethod.PARTITION_KEY_AS_PARTITION.equals(partitioningMethod)) {
       if (!(partitionKey instanceof Integer)) {
         String msg = "Partition key should be of type Integer";
         throw new SamzaException(msg);
       }
 
-      Integer numPartition = streamPartitionSenders.get(streamName).size();
+      Integer numPartition = streamPartitionSenders.get(streamId).size();
       Integer destinationPartition = (Integer) partitionKey % numPartition;
 
-      PartitionSender sender = 
streamPartitionSenders.get(streamName).get(destinationPartition);
+      PartitionSender sender = 
streamPartitionSenders.get(streamId).get(destinationPartition);
       return sender.send(eventData);
     } else {
       throw new SamzaException("Unknown partitioning method " + 
partitioningMethod);
@@ -256,7 +265,7 @@ public class EventHubSystemProducer implements 
SystemProducer {
   }
 
   protected Object getEnvelopePartitionId(OutgoingMessageEnvelope envelope) {
-    return envelope.getPartitionKey();
+    return envelope.getPartitionKey() == null ? envelope.getKey() : 
envelope.getPartitionKey();
   }
 
   private String convertPartitionKeyToString(Object partitionKey) {
@@ -267,12 +276,12 @@ public class EventHubSystemProducer implements 
SystemProducer {
     } else if (partitionKey instanceof byte[]) {
       return new String((byte[]) partitionKey, Charset.defaultCharset());
     } else {
-      throw new SamzaException("Unsupported key type: " + 
partitionKey.getClass().toString());
+      throw new SamzaException("Unsupported key type: " +  
partitionKey.getClass().toString());
     }
   }
 
-  protected EventData createEventData(String streamName, 
OutgoingMessageEnvelope envelope) {
-    Optional<Interceptor> interceptor = 
Optional.ofNullable(interceptors.getOrDefault(streamName, null));
+  protected EventData createEventData(String streamId, OutgoingMessageEnvelope 
envelope) {
+    Optional<Interceptor> interceptor = 
Optional.ofNullable(interceptors.getOrDefault(streamId, null));
     byte[] eventValue = (byte[]) envelope.getMessage();
     if (interceptor.isPresent()) {
       eventValue = interceptor.get().intercept(eventValue);
@@ -288,7 +297,7 @@ public class EventHubSystemProducer implements 
SystemProducer {
         keyValue = (envelope.getKey() instanceof byte[]) ? new String((byte[]) 
envelope.getKey())
                 : envelope.getKey().toString();
       }
-      eventData.getProperties().put("key", keyValue);
+      eventData.getProperties().put(KEY, keyValue);
     }
     return eventData;
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/1d420e75/samza-azure/src/test/java/org/apache/samza/checkpoint/azure/ITestAzureCheckpointManager.java
----------------------------------------------------------------------
diff --git 
a/samza-azure/src/test/java/org/apache/samza/checkpoint/azure/ITestAzureCheckpointManager.java
 
b/samza-azure/src/test/java/org/apache/samza/checkpoint/azure/ITestAzureCheckpointManager.java
index 3e5ead0..4560b11 100644
--- 
a/samza-azure/src/test/java/org/apache/samza/checkpoint/azure/ITestAzureCheckpointManager.java
+++ 
b/samza-azure/src/test/java/org/apache/samza/checkpoint/azure/ITestAzureCheckpointManager.java
@@ -34,7 +34,7 @@ import org.junit.*;
 import java.util.HashMap;
 import java.util.Map;
 
-@Ignore("Intergration Test")
+@Ignore("Requires Azure account credentials")
 public class ITestAzureCheckpointManager {
 
   private static String storageConnectionString = "";

http://git-wip-us.apache.org/repos/asf/samza/blob/1d420e75/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventHubClientManagerFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventHubClientManagerFactory.java
 
b/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventHubClientManagerFactory.java
index 1f06f7d..368087a 100644
--- 
a/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventHubClientManagerFactory.java
+++ 
b/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventHubClientManagerFactory.java
@@ -101,6 +101,10 @@ public class MockEventHubClientManagerFactory extends 
EventHubClientManagerFacto
           }
           return null;
         });
+      EventHubPartitionRuntimeInformation mockPartitionRuntimeInfo = 
PowerMockito.mock(EventHubPartitionRuntimeInformation.class);
+      PowerMockito.when(mockPartitionRuntimeInfo.getLastEnqueuedOffset())
+              .thenReturn(EventHubSystemConsumer.START_OF_STREAM);
+      CompletableFuture<EventHubPartitionRuntimeInformation> partitionFuture = 
 new MockPartitionFuture(mockPartitionRuntimeInfo);
 
       // Producer mocks
       PartitionSender mockPartitionSender0 = 
PowerMockito.mock(PartitionSender.class);
@@ -137,6 +141,7 @@ public class MockEventHubClientManagerFactory extends 
EventHubClientManagerFacto
                     startingOffsets.put(partitionId, offset);
                     return mockPartitionReceiver;
                   });
+        
PowerMockito.when(mockEventHubClient.getPartitionRuntimeInformation(anyString())).thenReturn(partitionFuture);
 
         // Producer calls
         
PowerMockito.when(mockEventHubClient.createPartitionSenderSync("0")).thenReturn(mockPartitionSender0);
@@ -191,6 +196,19 @@ public class MockEventHubClientManagerFactory extends 
EventHubClientManagerFacto
       }
     }
 
+    private class MockPartitionFuture extends 
CompletableFuture<EventHubPartitionRuntimeInformation> {
+      EventHubPartitionRuntimeInformation runtimeInformation;
+
+      MockPartitionFuture(EventHubPartitionRuntimeInformation 
runtimeInformation) {
+        this.runtimeInformation = runtimeInformation;
+      }
+
+      @Override
+      public EventHubPartitionRuntimeInformation get(long timeout, TimeUnit 
unit) {
+        return runtimeInformation;
+      }
+    }
+
   }
 
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/1d420e75/samza-azure/src/test/java/org/apache/samza/system/eventhub/admin/TestEventHubSystemAdmin.java
----------------------------------------------------------------------
diff --git 
a/samza-azure/src/test/java/org/apache/samza/system/eventhub/admin/TestEventHubSystemAdmin.java
 
b/samza-azure/src/test/java/org/apache/samza/system/eventhub/admin/TestEventHubSystemAdmin.java
index c49e68e..8861152 100644
--- 
a/samza-azure/src/test/java/org/apache/samza/system/eventhub/admin/TestEventHubSystemAdmin.java
+++ 
b/samza-azure/src/test/java/org/apache/samza/system/eventhub/admin/TestEventHubSystemAdmin.java
@@ -49,14 +49,8 @@ public class TestEventHubSystemAdmin {
     Assert.assertEquals(0, eventHubSystemAdmin.offsetComparator("150", 
"150").intValue());
     Assert.assertEquals(1, eventHubSystemAdmin.offsetComparator("200", 
"100").intValue());
     Assert.assertNull(eventHubSystemAdmin.offsetComparator("1", "a"));
-    Assert.assertEquals(-1, eventHubSystemAdmin
-            .offsetComparator("100", 
EventHubSystemConsumer.END_OF_STREAM).intValue());
-    Assert.assertEquals(0, 
eventHubSystemAdmin.offsetComparator(EventHubSystemConsumer.END_OF_STREAM,
-            EventHubSystemConsumer.END_OF_STREAM).intValue());
-    Assert.assertEquals(1, eventHubSystemAdmin
-            .offsetComparator(EventHubSystemConsumer.END_OF_STREAM, 
"100").intValue());
-    Assert.assertEquals(-1, eventHubSystemAdmin
-            .offsetComparator(EventHubSystemConsumer.START_OF_STREAM, 
"10").intValue());
+    Assert.assertNull(eventHubSystemAdmin.offsetComparator("100", 
EventHubSystemConsumer.END_OF_STREAM));
+    
Assert.assertNull(eventHubSystemAdmin.offsetComparator(EventHubSystemConsumer.END_OF_STREAM,
 EventHubSystemConsumer.END_OF_STREAM));
   }
 
   @Test
@@ -66,16 +60,13 @@ public class TestEventHubSystemAdmin {
             
MockEventHubConfigFactory.getEventHubConfig(EventHubSystemProducer.PartitioningMethod.EVENT_HUB_HASHING));
     Map<SystemStreamPartition, String> offsets = new HashMap<>();
     SystemStreamPartition ssp0 = new SystemStreamPartition(SYSTEM_NAME, 
STREAM_NAME1, new Partition(0));
-    SystemStreamPartition ssp1 = new SystemStreamPartition(SYSTEM_NAME, 
STREAM_NAME1, new Partition(1));
     SystemStreamPartition ssp2 = new SystemStreamPartition(SYSTEM_NAME, 
STREAM_NAME1, new Partition(2));
     offsets.put(ssp0, Integer.toString(0));
-    offsets.put(ssp1, EventHubSystemConsumer.END_OF_STREAM);
     offsets.put(ssp2, EventHubSystemConsumer.START_OF_STREAM);
 
     Map<SystemStreamPartition, String> updatedOffsets = 
eventHubSystemAdmin.getOffsetsAfter(offsets);
     Assert.assertEquals(offsets.size(), updatedOffsets.size());
     Assert.assertEquals("1", updatedOffsets.get(ssp0));
-    Assert.assertEquals("-2", updatedOffsets.get(ssp1));
     Assert.assertEquals("0", updatedOffsets.get(ssp2));
   }
 
@@ -102,8 +93,6 @@ public class TestEventHubSystemAdmin {
       partitionMetadataMap.forEach((partition, metadata) -> {
           Assert.assertEquals(EventHubSystemConsumer.START_OF_STREAM, 
metadata.getOldestOffset());
           Assert.assertNotSame(EventHubSystemConsumer.END_OF_STREAM, 
metadata.getNewestOffset());
-          
Assert.assertTrue(Long.parseLong(EventHubSystemConsumer.END_OF_STREAM)
-                  <= Long.parseLong(metadata.getNewestOffset()));
           String expectedUpcomingOffset = 
String.valueOf(Long.parseLong(metadata.getNewestOffset()) + 1);
           Assert.assertEquals(expectedUpcomingOffset, 
metadata.getUpcomingOffset());
         });

http://git-wip-us.apache.org/repos/asf/samza/blob/1d420e75/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/ITestEventHubSystemConsumer.java
----------------------------------------------------------------------
diff --git 
a/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/ITestEventHubSystemConsumer.java
 
b/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/ITestEventHubSystemConsumer.java
index dbf8807..cfd8217 100644
--- 
a/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/ITestEventHubSystemConsumer.java
+++ 
b/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/ITestEventHubSystemConsumer.java
@@ -29,14 +29,15 @@ import 
org.apache.samza.system.eventhub.MockEventHubConfigFactory;
 import org.apache.samza.system.eventhub.TestMetricsRegistry;
 import org.apache.samza.system.eventhub.producer.EventHubSystemProducer;
 import org.junit.Assert;
+import org.junit.Ignore;
 import org.junit.Test;
 
-import java.util.Collections;
-import java.util.List;
+import java.util.*;
 
 import static 
org.apache.samza.system.eventhub.MockEventHubConfigFactory.STREAM_NAME1;
 import static 
org.apache.samza.system.eventhub.MockEventHubConfigFactory.SYSTEM_NAME;
 
+@Ignore("Requires Azure account credentials")
 public class ITestEventHubSystemConsumer {
 
   private Config createEventHubConfig() {

http://git-wip-us.apache.org/repos/asf/samza/blob/1d420e75/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/TestEventHubSystemConsumer.java
----------------------------------------------------------------------
diff --git 
a/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/TestEventHubSystemConsumer.java
 
b/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/TestEventHubSystemConsumer.java
index a25a3b6..865a248 100644
--- 
a/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/TestEventHubSystemConsumer.java
+++ 
b/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/TestEventHubSystemConsumer.java
@@ -22,6 +22,7 @@ package org.apache.samza.system.eventhub.consumer;
 
 import com.microsoft.azure.eventhubs.*;
 import org.apache.samza.Partition;
+import org.apache.samza.config.MapConfig;
 import org.apache.samza.metrics.Counter;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.SystemStreamPartition;
@@ -41,7 +42,8 @@ import java.util.stream.Collectors;
 import static org.apache.samza.system.eventhub.MockEventHubConfigFactory.*;
 
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({EventHubRuntimeInformation.class, EventHubClient.class, 
PartitionReceiver.class, PartitionSender.class})
+@PrepareForTest({EventHubRuntimeInformation.class, 
EventHubPartitionRuntimeInformation.class,
+        EventHubClient.class, PartitionReceiver.class, PartitionSender.class})
 public class TestEventHubSystemConsumer {
   private static final String MOCK_ENTITY_1 = "mocktopic1";
   private static final String MOCK_ENTITY_2 = "mocktopic2";
@@ -85,11 +87,12 @@ public class TestEventHubSystemConsumer {
     configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, 
systemName, streamName), EVENTHUB_KEY_NAME);
     configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, 
systemName, streamName), EVENTHUB_KEY);
     configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, 
systemName, streamName), MOCK_ENTITY_1);
+    MapConfig config = new MapConfig(configMap);
 
     MockEventHubClientManagerFactory eventHubClientWrapperFactory = new 
MockEventHubClientManagerFactory(eventData);
 
     EventHubSystemConsumer consumer =
-            new EventHubSystemConsumer(new EventHubConfig(configMap), 
systemName, eventHubClientWrapperFactory, interceptors,
+            new EventHubSystemConsumer(new EventHubConfig(config), systemName, 
eventHubClientWrapperFactory, interceptors,
                     testMetrics);
     consumer.register(ssp, "1");
     consumer.register(ssp, EventHubSystemConsumer.END_OF_STREAM);
@@ -124,11 +127,12 @@ public class TestEventHubSystemConsumer {
     configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, 
systemName, streamName), EVENTHUB_KEY_NAME);
     configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, 
systemName, streamName), EVENTHUB_KEY);
     configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, 
systemName, streamName), MOCK_ENTITY_1);
+    MapConfig config = new MapConfig(configMap);
 
     MockEventHubClientManagerFactory eventHubClientWrapperFactory = new 
MockEventHubClientManagerFactory(eventData);
 
     EventHubSystemConsumer consumer =
-            new EventHubSystemConsumer(new EventHubConfig(configMap), 
systemName, eventHubClientWrapperFactory, interceptors,
+            new EventHubSystemConsumer(new EventHubConfig(config), systemName, 
eventHubClientWrapperFactory, interceptors,
                     testMetrics);
     consumer.register(ssp, EventHubSystemConsumer.END_OF_STREAM);
     consumer.start();
@@ -173,11 +177,12 @@ public class TestEventHubSystemConsumer {
     configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, 
systemName, streamName), EVENTHUB_KEY_NAME);
     configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, 
systemName, streamName), EVENTHUB_KEY);
     configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, 
systemName, streamName), MOCK_ENTITY_1);
+    MapConfig config = new MapConfig(configMap);
 
     MockEventHubClientManagerFactory eventHubClientWrapperFactory = new 
MockEventHubClientManagerFactory(eventData);
 
     EventHubSystemConsumer consumer =
-            new EventHubSystemConsumer(new EventHubConfig(configMap), 
systemName, eventHubClientWrapperFactory, interceptors,
+            new EventHubSystemConsumer(new EventHubConfig(config), systemName, 
eventHubClientWrapperFactory, interceptors,
                     testMetrics);
     consumer.register(ssp, EventHubSystemConsumer.END_OF_STREAM);
     consumer.start();
@@ -224,11 +229,12 @@ public class TestEventHubSystemConsumer {
     configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, 
systemName, streamName), EVENTHUB_NAMESPACE);
     configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, 
systemName, streamName), EVENTHUB_KEY_NAME);
     configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, 
systemName, streamName), EVENTHUB_KEY);
+    MapConfig config = new MapConfig(configMap);
 
     MockEventHubClientManagerFactory eventHubClientWrapperFactory = new 
MockEventHubClientManagerFactory(eventData);
 
     EventHubSystemConsumer consumer =
-            new EventHubSystemConsumer(new EventHubConfig(configMap), 
systemName, eventHubClientWrapperFactory, interceptor,
+            new EventHubSystemConsumer(new EventHubConfig(config), systemName, 
eventHubClientWrapperFactory, interceptor,
                     testMetrics);
     consumer.register(ssp1, EventHubSystemConsumer.START_OF_STREAM);
     consumer.register(ssp2, EventHubSystemConsumer.START_OF_STREAM);
@@ -284,11 +290,12 @@ public class TestEventHubSystemConsumer {
     configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, 
systemName, streamName2), EVENTHUB_NAMESPACE);
     configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, 
systemName, streamName2), EVENTHUB_KEY_NAME);
     configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, 
systemName, streamName2), EVENTHUB_KEY);
+    MapConfig config = new MapConfig(configMap);
 
     MockEventHubClientManagerFactory eventHubClientWrapperFactory = new 
MockEventHubClientManagerFactory(eventData);
 
     EventHubSystemConsumer consumer =
-            new EventHubSystemConsumer(new EventHubConfig(configMap), 
systemName, eventHubClientWrapperFactory, interceptor,
+            new EventHubSystemConsumer(new EventHubConfig(config), systemName, 
eventHubClientWrapperFactory, interceptor,
                     testMetrics);
 
     consumer.register(ssp1, EventHubSystemConsumer.START_OF_STREAM);

http://git-wip-us.apache.org/repos/asf/samza/blob/1d420e75/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/ITestEventHubSystemProducer.java
----------------------------------------------------------------------
diff --git 
a/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/ITestEventHubSystemProducer.java
 
b/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/ITestEventHubSystemProducer.java
index cc40198..32b1604 100644
--- 
a/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/ITestEventHubSystemProducer.java
+++ 
b/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/ITestEventHubSystemProducer.java
@@ -30,12 +30,14 @@ import org.apache.samza.system.eventhub.*;
 import org.apache.samza.system.eventhub.consumer.EventHubSystemConsumer;
 import org.apache.samza.util.NoOpMetricsRegistry;
 import org.junit.Assert;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static org.apache.samza.system.eventhub.MockEventHubConfigFactory.*;
 
+@Ignore("Requires Azure account credentials")
 public class ITestEventHubSystemProducer {
   private static final Logger LOG = 
LoggerFactory.getLogger(ITestEventHubSystemProducer.class.getName());
 

http://git-wip-us.apache.org/repos/asf/samza/blob/1d420e75/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/TestEventHubSystemProducer.java
----------------------------------------------------------------------
diff --git 
a/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/TestEventHubSystemProducer.java
 
b/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/TestEventHubSystemProducer.java
index 10016ec..ef73775 100644
--- 
a/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/TestEventHubSystemProducer.java
+++ 
b/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/TestEventHubSystemProducer.java
@@ -20,6 +20,7 @@
 package org.apache.samza.system.eventhub.producer;
 
 import com.microsoft.azure.eventhubs.*;
+import org.apache.samza.config.MapConfig;
 import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.eventhub.EventHubConfig;
@@ -40,9 +41,12 @@ import java.util.stream.Collectors;
 import static org.apache.samza.system.eventhub.MockEventHubConfigFactory.*;
 
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({EventHubRuntimeInformation.class, EventHubClient.class, 
PartitionReceiver.class, PartitionSender.class})
+@PrepareForTest({EventHubRuntimeInformation.class, 
EventHubPartitionRuntimeInformation.class,
+        EventHubClient.class, PartitionReceiver.class, PartitionSender.class})
 public class TestEventHubSystemProducer {
 
+  private static final String SOURCE = "TestEventHubSystemProducer";
+
   private static List<String> generateMessages(int numMsg) {
     Random rand = new Random(System.currentTimeMillis());
     List<String> messages = new ArrayList<>();
@@ -76,20 +80,21 @@ public class TestEventHubSystemProducer {
     configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, 
systemName, streamName), EVENTHUB_ENTITY1);
     
configMap.put(String.format(EventHubConfig.CONFIG_PRODUCER_PARTITION_METHOD, 
systemName),
             PartitioningMethod.PARTITION_KEY_AS_PARTITION.toString());
+    MapConfig config = new MapConfig(configMap);
 
     MockEventHubClientManagerFactory factory = new 
MockEventHubClientManagerFactory();
 
     EventHubSystemProducer producer =
-            new EventHubSystemProducer(new EventHubConfig(configMap), 
systemName, factory, interceptor, testMetrics);
+            new EventHubSystemProducer(new EventHubConfig(config), systemName, 
factory, interceptor, testMetrics);
 
     SystemStream systemStream = new SystemStream(systemName, streamName);
-    producer.register(streamName);
+    producer.register(SOURCE);
     producer.start();
 
     outgoingMessagesP0.forEach(message ->
-            producer.send(streamName, new 
OutgoingMessageEnvelope(systemStream, partitionId0, null, message.getBytes())));
+            producer.send(SOURCE, new OutgoingMessageEnvelope(systemStream, 
partitionId0, null, message.getBytes())));
     outgoingMessagesP1.forEach(message ->
-            producer.send(streamName, new 
OutgoingMessageEnvelope(systemStream, partitionId1, null, message.getBytes())));
+            producer.send(SOURCE, new OutgoingMessageEnvelope(systemStream, 
partitionId1, null, message.getBytes())));
 
     // Retrieve sent data
     List<String> receivedData0 = factory.getSentData(systemName, streamName, 
partitionId0)
@@ -126,20 +131,21 @@ public class TestEventHubSystemProducer {
     configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, 
systemName, streamName), EVENTHUB_ENTITY1);
     
configMap.put(String.format(EventHubConfig.CONFIG_PRODUCER_PARTITION_METHOD, 
systemName),
             PartitioningMethod.PARTITION_KEY_AS_PARTITION.toString());
+    MapConfig config = new MapConfig(configMap);
 
     MockEventHubClientManagerFactory factory = new 
MockEventHubClientManagerFactory();
 
     EventHubSystemProducer producer =
-            new EventHubSystemProducer(new EventHubConfig(configMap), 
systemName, factory, interceptors, testMetrics);
+            new EventHubSystemProducer(new EventHubConfig(config), systemName, 
factory, interceptors, testMetrics);
 
     SystemStream systemStream = new SystemStream(systemName, streamName);
-    producer.register(streamName);
+    producer.register(SOURCE);
     producer.start();
 
     outgoingMessagesP0.forEach(message ->
-            producer.send(streamName, new 
OutgoingMessageEnvelope(systemStream, partitionId0, null, message.getBytes())));
+            producer.send(SOURCE, new OutgoingMessageEnvelope(systemStream, 
partitionId0, null, message.getBytes())));
     outgoingMessagesP1.forEach(message ->
-            producer.send(streamName, new 
OutgoingMessageEnvelope(systemStream, partitionId1, null, message.getBytes())));
+            producer.send(SOURCE, new OutgoingMessageEnvelope(systemStream, 
partitionId1, null, message.getBytes())));
 
     // Retrieve sent data
     List<String> receivedData0 = factory.getSentData(systemName, streamName, 
partitionId0)
@@ -180,23 +186,25 @@ public class TestEventHubSystemProducer {
     configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, 
systemName, streamName), EVENTHUB_KEY_NAME);
     configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, 
systemName, streamName), EVENTHUB_KEY);
     configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, 
systemName, streamName), EVENTHUB_ENTITY1);
+
     // mod 2 on the partitionid to simulate consistent hashing
     
configMap.put(String.format(EventHubConfig.CONFIG_PRODUCER_PARTITION_METHOD, 
systemName),
             PartitioningMethod.EVENT_HUB_HASHING.toString());
+    MapConfig config = new MapConfig(configMap);
 
     MockEventHubClientManagerFactory factory = new 
MockEventHubClientManagerFactory();
 
     EventHubSystemProducer producer =
-            new EventHubSystemProducer(new EventHubConfig(configMap), 
systemName, factory, interceptor, testMetrics);
+            new EventHubSystemProducer(new EventHubConfig(config), systemName, 
factory, interceptor, testMetrics);
 
     SystemStream systemStream = new SystemStream(systemName, streamName);
-    producer.register(streamName);
+    producer.register(SOURCE);
     producer.start();
 
     outgoingMessagesP0.forEach(message ->
-            producer.send(streamName, new 
OutgoingMessageEnvelope(systemStream, partitionId0, null, message.getBytes())));
+            producer.send(SOURCE, new OutgoingMessageEnvelope(systemStream, 
partitionId0, null, message.getBytes())));
     outgoingMessagesP1.forEach(message ->
-            producer.send(streamName, new 
OutgoingMessageEnvelope(systemStream, partitionId1, null, message.getBytes())));
+            producer.send(SOURCE, new OutgoingMessageEnvelope(systemStream, 
partitionId1, null, message.getBytes())));
 
     // Retrieve sent data
     List<String> receivedData0 = factory.getSentData(systemName, streamName, 0)

Reply via email to