Repository: samza Updated Branches: refs/heads/master e74998c5e -> 1d420e750
SAMZA-1532; Eventhub connector fix Key fixes vjagadish1989 lhaiesp srinipunuru - Switched Producer source vs destination assumptions in `send`, `register` - Check `OME.key` if `OME.partitionId` is null for to get partitionId - Upcoming offset changed the `END_OF_STREAM` rather than `newestOffset` + 1, eventHub returns an error if the offset does not exist in the system - Made the NewestOffset+1 as upcoming offset, consumer checks if the offset is valid on startup - Differentiated between streamNames and streamIds in configs, consumer, producer - Checkpoint table named after job name - Checkpoint prints better message for invalid key on write QOL - How to ignore integration tests - Improved logging EDIT: - Also added Round Robin producer partitioning Author: Daniel Chen <[email protected]> Reviewers: Jagadish<[email protected]> Closes #377 from dxichen/eventhub-connector-fix Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/1d420e75 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/1d420e75 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/1d420e75 Branch: refs/heads/master Commit: 1d420e750eb8dc7f0715f7e11e01af1e9ce61506 Parents: e74998c Author: Daniel Chen <[email protected]> Authored: Mon Dec 11 16:45:44 2017 -0800 Committer: Jagadish <[email protected]> Committed: Mon Dec 11 16:45:44 2017 -0800 ---------------------------------------------------------------------- build.gradle | 5 - .../azure/AzureCheckpointManager.java | 31 +++++- .../azure/AzureCheckpointManagerFactory.java | 4 +- .../samza/system/eventhub/EventHubConfig.java | 71 ++++++++++--- .../eventhub/SamzaEventHubClientManager.java | 4 +- .../eventhub/admin/EventHubSystemAdmin.java | 17 +-- .../consumer/EventHubSystemConsumer.java | 77 ++++++++++---- .../producer/EventHubSystemProducer.java | 103 ++++++++++--------- .../azure/ITestAzureCheckpointManager.java | 2 +- .../MockEventHubClientManagerFactory.java | 18 ++++ .../eventhub/admin/TestEventHubSystemAdmin.java | 15 +-- .../consumer/ITestEventHubSystemConsumer.java | 5 +- .../consumer/TestEventHubSystemConsumer.java | 19 ++-- .../producer/ITestEventHubSystemProducer.java | 2 + .../producer/TestEventHubSystemProducer.java | 34 +++--- 15 files changed, 272 insertions(+), 135 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/1d420e75/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index be1baf7..330ff0b 100644 --- a/build.gradle +++ b/build.gradle @@ -213,11 +213,6 @@ project(':samza-azure') { configFile = new File(rootDir, "checkstyle/checkstyle.xml") toolVersion = "$checkstyleVersion" } - test { - // Exclude integration tests that require connection to EventHub - exclude 'org/apache/samza/system/eventhub/producer/*ITest*' - exclude 'org/apache/samza/system/eventhub/consumer/*ITest*' - } } project(':samza-aws') { http://git-wip-us.apache.org/repos/asf/samza/blob/1d420e75/samza-azure/src/main/java/org/apache/samza/checkpoint/azure/AzureCheckpointManager.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/checkpoint/azure/AzureCheckpointManager.java b/samza-azure/src/main/java/org/apache/samza/checkpoint/azure/AzureCheckpointManager.java index df3e490..2cad3bd 100644 --- a/samza-azure/src/main/java/org/apache/samza/checkpoint/azure/AzureCheckpointManager.java +++ b/samza-azure/src/main/java/org/apache/samza/checkpoint/azure/AzureCheckpointManager.java @@ -34,6 +34,7 @@ import org.apache.samza.serializers.JsonSerdeV2; import org.apache.samza.system.SystemStreamPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.Option; import java.net.URISyntaxException; import java.util.HashMap; @@ -60,12 +61,16 @@ public class AzureCheckpointManager implements CheckpointManager { private static final Logger LOG = LoggerFactory.getLogger(AzureCheckpointManager.class.getName()); private static final String PARTITION_KEY = "PartitionKey"; + // Invalid characters in key field on Azure Table + public static final String REGEX_INVALID_KEY = ".*[#?/\\\\].*"; + public static final String REGEX_TABLE_NAME = "[^A-Za-z0-9]"; + public static final int MAX_WRITE_BATCH_SIZE = 100; - public static final String CHECKPOINT_MANAGER_TABLE_NAME = "SamzaTaskCheckpoints"; public static final String SYSTEM_PROP_NAME = "system"; public static final String STREAM_PROP_NAME = "stream"; public static final String PARTITION_PROP_NAME = "partition"; + private final String jobTableName; private final String storageConnectionString; private final AzureClient azureClient; private CloudTable cloudTable; @@ -73,7 +78,12 @@ public class AzureCheckpointManager implements CheckpointManager { private final Set<TaskName> taskNames = new HashSet<>(); private final JsonSerdeV2<Map<String, String>> jsonSerde = new JsonSerdeV2<>(); - AzureCheckpointManager(AzureConfig azureConfig) { + AzureCheckpointManager(AzureConfig azureConfig, Option<String> jobName) { + if (!jobName.isDefined()) { + throw new AzureException("Jobs must have a name to use Azure Checkpoint Manager"); + } + // Remove invalid characters + jobTableName = jobName.get().replaceAll(REGEX_TABLE_NAME, ""); storageConnectionString = azureConfig.getAzureConnectionString(); azureClient = new AzureClient(storageConnectionString); } @@ -82,7 +92,7 @@ public class AzureCheckpointManager implements CheckpointManager { public void start() { try { // Create the table if it doesn't exist. - cloudTable = azureClient.getTableClient().getTableReference(CHECKPOINT_MANAGER_TABLE_NAME); + cloudTable = azureClient.getTableClient().getTableReference(jobTableName); cloudTable.createIfNotExists(); } catch (URISyntaxException e) { @@ -115,9 +125,13 @@ public class AzureCheckpointManager implements CheckpointManager { SystemStreamPartition ssp = entry.getKey(); String offset = entry.getValue(); + String partitionKey = taskName.toString(); + checkValidKey(partitionKey, "Taskname"); + String rowKey = serializeSystemStreamPartition(ssp); + checkValidKey(rowKey, "SystemStreamPartition"); + // Create table entity - TaskCheckpointEntity taskCheckpoint = new TaskCheckpointEntity(taskName.toString(), - serializeSystemStreamPartition(ssp), offset); + TaskCheckpointEntity taskCheckpoint = new TaskCheckpointEntity(partitionKey, rowKey, offset); // Add to batch operation batchOperation.insertOrReplace(taskCheckpoint); @@ -135,6 +149,13 @@ public class AzureCheckpointManager implements CheckpointManager { } } + private void checkValidKey(String key, String fieldUsed) { + if (key == null || key.matches(REGEX_INVALID_KEY)) { + throw new AzureException(String.format("Cannot insert to Azure Checkpoint Manager; %s %s contains invalid characters [*, /, \\\\, ?]", + fieldUsed, key)); + } + } + private String serializeSystemStreamPartition(SystemStreamPartition ssp) { // Create the Json string for SystemStreamPartition Map<String, String> sspMap = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/samza/blob/1d420e75/samza-azure/src/main/java/org/apache/samza/checkpoint/azure/AzureCheckpointManagerFactory.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/checkpoint/azure/AzureCheckpointManagerFactory.java b/samza-azure/src/main/java/org/apache/samza/checkpoint/azure/AzureCheckpointManagerFactory.java index 3c5d62a..95728e3 100644 --- a/samza-azure/src/main/java/org/apache/samza/checkpoint/azure/AzureCheckpointManagerFactory.java +++ b/samza-azure/src/main/java/org/apache/samza/checkpoint/azure/AzureCheckpointManagerFactory.java @@ -23,11 +23,13 @@ import org.apache.samza.checkpoint.CheckpointManager; import org.apache.samza.checkpoint.CheckpointManagerFactory; import org.apache.samza.config.AzureConfig; import org.apache.samza.config.Config; +import org.apache.samza.config.JobConfig; import org.apache.samza.metrics.MetricsRegistry; public class AzureCheckpointManagerFactory implements CheckpointManagerFactory { @Override public CheckpointManager getCheckpointManager(Config config, MetricsRegistry registry) { - return new AzureCheckpointManager(new AzureConfig(config)); + JobConfig jobConfig = new JobConfig(config); + return new AzureCheckpointManager(new AzureConfig(config), jobConfig.getName()); } } http://git-wip-us.apache.org/repos/asf/samza/blob/1d420e75/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java index 3bc04f8..5d83911 100644 --- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java +++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java @@ -20,10 +20,15 @@ package org.apache.samza.system.eventhub; import com.microsoft.azure.eventhubs.EventHubClient; +import org.apache.samza.SamzaException; +import org.apache.samza.config.Config; import org.apache.samza.config.MapConfig; +import org.apache.samza.config.StreamConfig; import org.apache.samza.system.eventhub.producer.EventHubSystemProducer; +import scala.collection.JavaConversions; import java.time.Duration; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -47,7 +52,7 @@ public class EventHubConfig extends MapConfig { .PartitioningMethod.EVENT_HUB_HASHING.name(); public static final String CONFIG_SEND_KEY_IN_EVENT_PROPERTIES = "systems.%s.eventhubs.send.key"; - public static final Boolean DEFAULT_CONFIG_SEND_KEY_IN_EVENT_PROPERTIES = false; + public static final Boolean DEFAULT_CONFIG_SEND_KEY_IN_EVENT_PROPERTIES = true; public static final String CONFIG_FETCH_RUNTIME_INFO_TIMEOUT_MILLIS = "systems.%s.eventhubs.runtime.info.timeout"; public static final long DEFAULT_CONFIG_FETCH_RUNTIME_INFO_TIMEOUT_MILLIS = Duration.ofMinutes(1L).toMillis(); @@ -55,9 +60,47 @@ public class EventHubConfig extends MapConfig { public static final String CONFIG_CONSUMER_BUFFER_CAPACITY = "systems.%s.eventhubs.receive.queue.size"; public static final int DEFAULT_CONFIG_CONSUMER_BUFFER_CAPACITY = 100; + private final Map<String, String> physcialToId = new HashMap<>(); - public EventHubConfig(Map<String, String> config) { + public EventHubConfig(Config config) { super(config); + + // Build reverse index for streamName -> streamId + StreamConfig streamConfig = new StreamConfig(config); + JavaConversions.asJavaCollection(streamConfig.getStreamIds()) + .forEach((streamId) -> physcialToId.put(streamConfig.getPhysicalName(streamId), streamId)); + } + + private String getFromStreamIdOrName(String configName, String systemName, String streamName, String defaultString) { + String result = getFromStreamIdOrName(configName, systemName, streamName); + if (result == null) { + return defaultString; + } + return result; + } + + private String getFromStreamIdOrName(String configName, String systemName, String streamName) { + String streamId = getStreamId(streamName); + return get(String.format(configName, systemName, streamId), + streamId.equals(streamName) ? null : get(String.format(configName, systemName, streamName))); + } + + private String validateRequiredConfig(String value, String fieldName, String systemName, String streamName) { + if (value == null) { + throw new SamzaException(String.format("Missing %s configuration for system: %s, stream: %s", + fieldName, systemName, streamName)); + } + return value; + } + + /** + * Get the streamId for the specified streamName + * + * @param streamName the physical identifier of a stream + * @return the streamId identifier for the stream or the queried streamName if it is not found. + */ + public String getStreamId(String streamName) { + return physcialToId.getOrDefault(streamName, streamName); } /** @@ -75,55 +118,59 @@ public class EventHubConfig extends MapConfig { * Get the EventHubs namespace for the stream * * @param systemName name of the system - * @param streamName name of stream + * @param streamName name of stream (physical or streamId) * @return EventHubs namespace */ public String getStreamNamespace(String systemName, String streamName) { - return get(String.format(CONFIG_STREAM_NAMESPACE, systemName, streamName)); + return validateRequiredConfig(getFromStreamIdOrName(CONFIG_STREAM_NAMESPACE, systemName, streamName), + "Namespace", systemName, streamName); } /** * Get the EventHubs entity path (topic name) for the stream * * @param systemName name of the system - * @param streamName name of stream + * @param streamName name of stream (physical or streamId) * @return EventHubs entity path */ public String getStreamEntityPath(String systemName, String streamName) { - return get(String.format(CONFIG_STREAM_ENTITYPATH, systemName, streamName)); + return validateRequiredConfig(getFromStreamIdOrName(CONFIG_STREAM_ENTITYPATH, systemName, streamName), + "EntityPath", systemName, streamName); } /** * Get the EventHubs SAS (Shared Access Signature) key name for the stream * * @param systemName name of the system - * @param streamName name of stream + * @param streamName name of stream (physical or streamId) * @return EventHubs SAS key name */ public String getStreamSasKeyName(String systemName, String streamName) { - return get(String.format(CONFIG_STREAM_SAS_KEY_NAME, systemName, streamName)); + return validateRequiredConfig(getFromStreamIdOrName(CONFIG_STREAM_SAS_KEY_NAME, systemName, streamName), + "SASKeyName", systemName, streamName); } /** * Get the EventHubs SAS (Shared Access Signature) token for the stream * * @param systemName name of the system - * @param streamName name of stream + * @param streamName name of stream (physical or streamId) * @return EventHubs SAS token */ public String getStreamSasToken(String systemName, String streamName) { - return get(String.format(CONFIG_STREAM_SAS_TOKEN, systemName, streamName)); + return validateRequiredConfig(getFromStreamIdOrName(CONFIG_STREAM_SAS_TOKEN, systemName, streamName), + "SASToken", systemName, streamName); } /** * Get the EventHubs consumer group used for consumption for the stream * * @param systemName name of the system - * @param streamName name of stream + * @param streamName name of stream (physical or streamId) * @return EventHubs consumer group */ public String getStreamConsumerGroup(String systemName, String streamName) { - return get(String.format(CONFIG_STREAM_CONSUMER_GROUP, systemName, streamName), DEFAULT_CONFIG_STREAM_CONSUMER_GROUP); + return getFromStreamIdOrName(CONFIG_STREAM_CONSUMER_GROUP, systemName, streamName, DEFAULT_CONFIG_STREAM_CONSUMER_GROUP); } /** http://git-wip-us.apache.org/repos/asf/samza/blob/1d420e75/samza-azure/src/main/java/org/apache/samza/system/eventhub/SamzaEventHubClientManager.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/SamzaEventHubClientManager.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/SamzaEventHubClientManager.java index ada5694..977e252 100644 --- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/SamzaEventHubClientManager.java +++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/SamzaEventHubClientManager.java @@ -75,8 +75,8 @@ public class SamzaEventHubClientManager implements EventHubClientManager { eventHubClient = EventHubClient.createFromConnectionStringSync(connectionStringBuilder.toString(), retryPolicy); } catch (IOException | ServiceBusException e) { - String msg = String.format("Creation of EventHub client failed for eventHub %s %s %s %s on remote host %s:%d", - entityPath, eventHubNamespace, sasKeyName, sasKey, remoteHost, ClientConstants.AMQPS_PORT); + String msg = String.format("Creation of EventHub client failed for eventHub EntityPath: %s on remote host %s:%d", + entityPath, remoteHost, ClientConstants.AMQPS_PORT); LOG.error(msg, e); throw new SamzaException(msg, e); } http://git-wip-us.apache.org/repos/asf/samza/blob/1d420e75/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java index 11998a4..91d504c 100644 --- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java +++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java @@ -63,8 +63,8 @@ public class EventHubSystemAdmin implements SystemAdmin { private String getNextOffset(String currentOffset) { // EventHub will return the first message AFTER the offset // that was specified in the fetch request. - return currentOffset.equals(EventHubSystemConsumer.END_OF_STREAM) ? currentOffset : - String.valueOf(Long.parseLong(currentOffset) + 1); + // If no such offset exists Eventhub will return an error. + return String.valueOf(Long.parseLong(currentOffset) + 1); } @Override @@ -158,6 +158,7 @@ public class EventHubSystemAdmin implements SystemAdmin { long timeoutMs = eventHubConfig.getRuntimeInfoWaitTimeMS(systemName); EventHubPartitionRuntimeInformation ehPartitionInfo = ehPartitionRuntimeInfo.get(timeoutMs, TimeUnit.MILLISECONDS); + // Set offsets String startingOffset = EventHubSystemConsumer.START_OF_STREAM; String newestOffset = ehPartitionInfo.getLastEnqueuedOffset(); String upcomingOffset = getNextOffset(newestOffset); @@ -181,12 +182,14 @@ public class EventHubSystemAdmin implements SystemAdmin { if (offset1 == null || offset2 == null) { return null; } + // Should NOT be able to compare with END_OF_STREAM to allow new offsets to be + // considered caught up if stream started at END_OF_STREAM offset + if (EventHubSystemConsumer.END_OF_STREAM.equals(offset1) || + EventHubSystemConsumer.END_OF_STREAM.equals(offset2)) { + return null; + } try { - if (offset1.equals(EventHubSystemConsumer.END_OF_STREAM)) { - return offset2.equals(EventHubSystemConsumer.END_OF_STREAM) ? 0 : 1; - } - return offset2.equals(EventHubSystemConsumer.END_OF_STREAM) ? -1 : - Long.compare(Long.parseLong(offset1), Long.parseLong(offset2)); + return Long.compare(Long.parseLong(offset1), Long.parseLong(offset2)); } catch (NumberFormatException exception) { return null; } http://git-wip-us.apache.org/repos/asf/samza/blob/1d420e75/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java index 4de34de..90c73dc 100644 --- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java +++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java @@ -20,6 +20,7 @@ package org.apache.samza.system.eventhub.consumer; import com.microsoft.azure.eventhubs.EventData; +import com.microsoft.azure.eventhubs.EventHubPartitionRuntimeInformation; import com.microsoft.azure.eventhubs.PartitionReceiveHandler; import com.microsoft.azure.eventhubs.PartitionReceiver; import com.microsoft.azure.servicebus.ServiceBusException; @@ -34,6 +35,7 @@ import org.apache.samza.system.eventhub.EventHubConfig; import org.apache.samza.system.eventhub.Interceptor; import org.apache.samza.system.eventhub.admin.EventHubSystemAdmin; import org.apache.samza.system.eventhub.metrics.SamzaHistogram; +import org.apache.samza.system.eventhub.producer.EventHubSystemProducer; import org.apache.samza.util.BlockingEnvelopeMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -140,23 +142,23 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap { this.config = config; this.systemName = systemName; this.interceptors = interceptors; - List<String> streamNames = config.getStreams(systemName); + List<String> streamIds = config.getStreams(systemName); // Create and initiate connections to Event Hubs - for (String streamName : streamNames) { + for (String streamId : streamIds) { EventHubClientManager eventHubClientManager = eventHubClientManagerFactory - .getEventHubClientManager(systemName, streamName, config); - streamEventHubManagers.put(streamName, eventHubClientManager); + .getEventHubClientManager(systemName, streamId, config); + streamEventHubManagers.put(streamId, eventHubClientManager); eventHubClientManager.init(); } // Initiate metrics - eventReadRates = streamNames.stream() + eventReadRates = streamIds.stream() .collect(Collectors.toMap(Function.identity(), x -> registry.newCounter(x, EVENT_READ_RATE))); - eventByteReadRates = streamNames.stream() + eventByteReadRates = streamIds.stream() .collect(Collectors.toMap(Function.identity(), x -> registry.newCounter(x, EVENT_BYTE_READ_RATE))); - readLatencies = streamNames.stream() + readLatencies = streamIds.stream() .collect(Collectors.toMap(Function.identity(), x -> new SamzaHistogram(registry, x, READ_LATENCY))); - readErrors = streamNames.stream() + readErrors = streamIds.stream() .collect(Collectors.toMap(Function.identity(), x -> registry.newCounter(x, READ_ERRORS))); // Locking to ensure that these aggregated metrics will be created only once across multiple system consumers. @@ -174,20 +176,41 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap { public void register(SystemStreamPartition systemStreamPartition, String offset) { super.register(systemStreamPartition, offset); + LOG.info(String.format("Eventhub consumer trying to register ssp %s, offset %s", systemStreamPartition, offset)); if (isStarted) { throw new SamzaException("Trying to add partition when the connection has already started."); } if (streamPartitionOffsets.containsKey(systemStreamPartition)) { + // Only update if new offset is lower than previous offset + if (END_OF_STREAM.equals(offset)) return; String prevOffset = streamPartitionOffsets.get(systemStreamPartition); - if (EventHubSystemAdmin.compareOffsets(offset, prevOffset) > -1) { - // Only update if new offset is lower than previous offset + if (!END_OF_STREAM.equals(prevOffset) && EventHubSystemAdmin.compareOffsets(offset, prevOffset) > -1) { return; } } streamPartitionOffsets.put(systemStreamPartition, offset); } + private String getNewestEventHubOffset(EventHubClientManager eventHubClientManager, String streamName, Integer partitionId) { + CompletableFuture<EventHubPartitionRuntimeInformation> partitionRuntimeInfoFuture = eventHubClientManager + .getEventHubClient() + .getPartitionRuntimeInformation(partitionId.toString()); + try { + long timeoutMs = config.getRuntimeInfoWaitTimeMS(systemName); + + EventHubPartitionRuntimeInformation partitionRuntimeInformation = partitionRuntimeInfoFuture + .get(timeoutMs, TimeUnit.MILLISECONDS); + + return partitionRuntimeInformation.getLastEnqueuedOffset(); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + String msg = String.format( + "Error while fetching EventHubPartitionRuntimeInfo for System:%s, Stream:%s, Partition:%s", + systemName, streamName, partitionId); + throw new SamzaException(msg); + } + } + @Override public void start() { isStarted = true; @@ -196,27 +219,36 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap { SystemStreamPartition ssp = entry.getKey(); String streamName = ssp.getStream(); + String streamId = config.getStreamId(ssp.getStream()); Integer partitionId = ssp.getPartition().getPartitionId(); String offset = entry.getValue(); - String consumerGroup = config.getStreamConsumerGroup(systemName, streamName); - String namespace = config.getStreamNamespace(systemName, streamName); - String entityPath = config.getStreamEntityPath(systemName, streamName); - EventHubClientManager eventHubClientManager = streamEventHubManagers.get(streamName); + String consumerGroup = config.getStreamConsumerGroup(systemName, streamId); + String namespace = config.getStreamNamespace(systemName, streamId); + String entityPath = config.getStreamEntityPath(systemName, streamId); + EventHubClientManager eventHubClientManager = streamEventHubManagers.get(streamId); try { + // Fetch the newest offset + String newestEventHubOffset = getNewestEventHubOffset(eventHubClientManager, streamName, partitionId); PartitionReceiver receiver; - if (offset.equals(EventHubSystemConsumer.END_OF_STREAM)) { + if (END_OF_STREAM.equals(offset) || EventHubSystemAdmin.compareOffsets(newestEventHubOffset, offset) == -1) { + // If the offset is greater than the newest offset, use the use current Instant as + // offset to fetch in Eventhub. receiver = eventHubClientManager.getEventHubClient() .createReceiverSync(consumerGroup, partitionId.toString(), Instant.now()); } else { + // If the offset is less or equal to the newest offset in the system, it can be + // used as the starting offset to receive from. EventHub will return the first + // message AFTER the offset that was specified in the fetch request. + // If no such offset exists Eventhub will return an error. receiver = eventHubClientManager.getEventHubClient() .createReceiverSync(consumerGroup, partitionId.toString(), offset, !offset.equals(EventHubSystemConsumer.START_OF_STREAM)); } - PartitionReceiveHandler handler = new PartitionReceiverHandlerImpl(ssp, eventReadRates.get(streamName), - eventByteReadRates.get(streamName), readLatencies.get(streamName), readErrors.get(streamName), - interceptors.getOrDefault(streamName, null)); + PartitionReceiveHandler handler = new PartitionReceiverHandlerImpl(ssp, eventReadRates.get(streamId), + eventByteReadRates.get(streamId), readLatencies.get(streamId), readErrors.get(streamId), + interceptors.getOrDefault(streamId, null)); // Timeout for EventHubClient receive @@ -261,11 +293,11 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap { } private void renewPartitionReceiver(SystemStreamPartition ssp) { - - EventHubClientManager eventHubClientManager = streamEventHubManagers.get(ssp.getStream()); + String streamId = config.getStreamId(ssp.getStream()); + EventHubClientManager eventHubClientManager = streamEventHubManagers.get(streamId); String offset = streamPartitionOffsets.get(ssp); Integer partitionId = ssp.getPartition().getPartitionId(); - String consumerGroup = config.getStreamConsumerGroup(ssp.getSystem(), ssp.getStream()); + String consumerGroup = config.getStreamConsumerGroup(ssp.getSystem(), streamId); try { // Close current receiver @@ -346,6 +378,9 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap { } String offset = event.getSystemProperties().getOffset(); Object partitionKey = event.getSystemProperties().getPartitionKey(); + if (partitionKey == null) { + partitionKey = event.getProperties().get(EventHubSystemProducer.KEY); + } try { updateMetrics(event); http://git-wip-us.apache.org/repos/asf/samza/blob/1d420e75/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java index 505421c..f294751 100644 --- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java +++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java @@ -58,6 +58,7 @@ public class EventHubSystemProducer implements SystemProducer { private static final long DEFAULT_FLUSH_TIMEOUT_MILLIS = Duration.ofMinutes(1L).toMillis(); public static final String PRODUCE_TIMESTAMP = "produce-timestamp"; + public static final String KEY = "key"; // Metrics recording private static final String EVENT_WRITE_RATE = "eventWriteRate"; @@ -75,8 +76,9 @@ public class EventHubSystemProducer implements SystemProducer { private static final Object AGGREGATE_METRICS_LOCK = new Object(); public enum PartitioningMethod { + ROUND_ROBIN, EVENT_HUB_HASHING, - PARTITION_KEY_AS_PARTITION, + PARTITION_KEY_AS_PARTITION } private final HashMap<String, Counter> eventWriteRate = new HashMap<>(); @@ -85,7 +87,6 @@ public class EventHubSystemProducer implements SystemProducer { private final HashMap<String, SamzaHistogram> sendCallbackLatency = new HashMap<>(); private final HashMap<String, Counter> sendErrors = new HashMap<>(); - private final EventHubClientManagerFactory eventHubClientManagerFactory; private final EventHubConfig config; private final MetricsRegistry registry; private final PartitioningMethod partitioningMethod; @@ -109,36 +110,35 @@ public class EventHubSystemProducer implements SystemProducer { this.registry = registry; this.systemName = systemName; this.partitioningMethod = config.getPartitioningMethod(systemName); - this.eventHubClientManagerFactory = eventHubClientManagerFactory; this.interceptors = interceptors; + + // Fetches the stream ids + List<String> streamIds = config.getStreams(systemName); + + // Create and initiate connections to Event Hubs + for (String streamId : streamIds) { + EventHubClientManager ehClient = eventHubClientManagerFactory.getEventHubClientManager(systemName, streamId, config); + eventHubClients.put(streamId, ehClient); + ehClient.init(); + } } @Override - public synchronized void register(String streamName) { - LOG.debug("Trying to register {}.", streamName); + public synchronized void register(String source) { if (isStarted) { String msg = "Cannot register once the producer is started."; throw new SamzaException(msg); } - - if (eventHubClients.containsKey(streamName)) { - LOG.warn("Already registered stream {}.", streamName); - return; - } - - EventHubClientManager ehClient = eventHubClientManagerFactory.getEventHubClientManager(systemName, streamName, config); - - ehClient.init(); - eventHubClients.put(streamName, ehClient); } @Override public synchronized void start() { LOG.debug("Starting system producer."); + // Create partition senders if required if (PartitioningMethod.PARTITION_KEY_AS_PARTITION.equals(partitioningMethod)) { // Create all partition senders - eventHubClients.forEach((streamName, samzaEventHubClient) -> { + eventHubClients.forEach((streamId, samzaEventHubClient) -> { EventHubClient ehClient = samzaEventHubClient.getEventHubClient(); try { @@ -153,7 +153,7 @@ public class EventHubSystemProducer implements SystemProducer { partitionSenders.put(i, partitionSender); } - streamPartitionSenders.put(streamName, partitionSenders); + streamPartitionSenders.put(streamId, partitionSenders); } catch (InterruptedException | ExecutionException | TimeoutException e) { String msg = "Failed to fetch number of Event Hub partitions for partition sender creation"; throw new SamzaException(msg, e); @@ -164,15 +164,16 @@ public class EventHubSystemProducer implements SystemProducer { }); } - for (String eventHub : eventHubClients.keySet()) { - eventWriteRate.put(eventHub, registry.newCounter(eventHub, EVENT_WRITE_RATE)); - eventByteWriteRate.put(eventHub, registry.newCounter(eventHub, EVENT_BYTE_WRITE_RATE)); - sendLatency.put(eventHub, new SamzaHistogram(registry, eventHub, SEND_LATENCY)); - sendCallbackLatency.put(eventHub, new SamzaHistogram(registry, eventHub, SEND_CALLBACK_LATENCY)); - sendErrors.put(eventHub, registry.newCounter(eventHub, SEND_ERRORS)); - } + // Initiate metrics + eventHubClients.keySet().forEach((streamId) -> { + eventWriteRate.put(streamId, registry.newCounter(streamId, EVENT_WRITE_RATE)); + eventByteWriteRate.put(streamId, registry.newCounter(streamId, EVENT_BYTE_WRITE_RATE)); + sendLatency.put(streamId, new SamzaHistogram(registry, streamId, SEND_LATENCY)); + sendCallbackLatency.put(streamId, new SamzaHistogram(registry, streamId, SEND_CALLBACK_LATENCY)); + sendErrors.put(streamId, registry.newCounter(streamId, SEND_ERRORS)); + }); - // Locking to ensure that these aggregated metrics will be created only once across multiple system consumers. + // Locking to ensure that these aggregated metrics will be created only once across multiple system producers. synchronized (AGGREGATE_METRICS_LOCK) { if (aggEventWriteRate == null) { aggEventWriteRate = registry.newCounter(AGGREGATE, EVENT_WRITE_RATE); @@ -187,35 +188,38 @@ public class EventHubSystemProducer implements SystemProducer { } @Override - public synchronized void send(String destination, OutgoingMessageEnvelope envelope) { + public synchronized void send(String source, OutgoingMessageEnvelope envelope) { + LOG.debug(String.format("Trying to send %s", envelope)); if (!isStarted) { throw new SamzaException("Trying to call send before the producer is started."); } - if (!eventHubClients.containsKey(destination)) { - String msg = String.format("Trying to send event to a destination {%s} that is not registered.", destination); + String streamId = config.getStreamId(envelope.getSystemStream().getStream()); + + if (!eventHubClients.containsKey(streamId)) { + String msg = String.format("Trying to send event to a destination {%s} that is not registered.", streamId); throw new SamzaException(msg); } checkCallbackThrowable("Received exception on message send"); - EventData eventData = createEventData(destination, envelope); + EventData eventData = createEventData(streamId, envelope); int eventDataLength = eventData.getBytes() == null ? 0 : eventData.getBytes().length; - eventWriteRate.get(destination).inc(); + eventWriteRate.get(streamId).inc(); aggEventWriteRate.inc(); - eventByteWriteRate.get(destination).inc(eventDataLength); + eventByteWriteRate.get(streamId).inc(eventDataLength); aggEventByteWriteRate.inc(eventDataLength); - EventHubClientManager ehClient = eventHubClients.get(destination); + EventHubClientManager ehClient = eventHubClients.get(streamId); long beforeSendTimeMs = System.currentTimeMillis(); // Async send call - CompletableFuture<Void> sendResult = sendToEventHub(destination, eventData, getEnvelopePartitionId(envelope), + CompletableFuture<Void> sendResult = sendToEventHub(streamId, eventData, getEnvelopePartitionId(envelope), ehClient.getEventHubClient()); long afterSendTimeMs = System.currentTimeMillis(); long latencyMs = afterSendTimeMs - beforeSendTimeMs; - sendLatency.get(destination).update(latencyMs); + sendLatency.get(streamId).update(latencyMs); aggSendLatency.update(latencyMs); pendingFutures.add(sendResult); @@ -223,32 +227,37 @@ public class EventHubSystemProducer implements SystemProducer { // Auto update the metrics and possible throwable when futures are complete. sendResult.handle((aVoid, throwable) -> { long callbackLatencyMs = System.currentTimeMillis() - afterSendTimeMs; - sendCallbackLatency.get(destination).update(callbackLatencyMs); + sendCallbackLatency.get(streamId).update(callbackLatencyMs); aggSendCallbackLatency.update(callbackLatencyMs); if (throwable != null) { - sendErrors.get(destination).inc(); + sendErrors.get(streamId).inc(); aggSendErrors.inc(); - LOG.error("Send message to event hub: {} failed with exception: ", destination, throwable); + LOG.error("Send message to event hub: {} failed with exception: ", streamId, throwable); sendExceptionOnCallback.compareAndSet(null, throwable); } return aVoid; }); } - private CompletableFuture<Void> sendToEventHub(String streamName, EventData eventData, Object partitionKey, + private CompletableFuture<Void> sendToEventHub(String streamId, EventData eventData, Object partitionKey, EventHubClient eventHubClient) { - if (partitioningMethod == PartitioningMethod.EVENT_HUB_HASHING) { + if (PartitioningMethod.ROUND_ROBIN.equals(partitioningMethod)) { + return eventHubClient.send(eventData); + } else if (PartitioningMethod.EVENT_HUB_HASHING.equals(partitioningMethod)) { + if (partitionKey == null) { + throw new SamzaException("Partition key cannot be null for EventHub hashing"); + } return eventHubClient.send(eventData, convertPartitionKeyToString(partitionKey)); - } else if (partitioningMethod == PartitioningMethod.PARTITION_KEY_AS_PARTITION) { + } else if (PartitioningMethod.PARTITION_KEY_AS_PARTITION.equals(partitioningMethod)) { if (!(partitionKey instanceof Integer)) { String msg = "Partition key should be of type Integer"; throw new SamzaException(msg); } - Integer numPartition = streamPartitionSenders.get(streamName).size(); + Integer numPartition = streamPartitionSenders.get(streamId).size(); Integer destinationPartition = (Integer) partitionKey % numPartition; - PartitionSender sender = streamPartitionSenders.get(streamName).get(destinationPartition); + PartitionSender sender = streamPartitionSenders.get(streamId).get(destinationPartition); return sender.send(eventData); } else { throw new SamzaException("Unknown partitioning method " + partitioningMethod); @@ -256,7 +265,7 @@ public class EventHubSystemProducer implements SystemProducer { } protected Object getEnvelopePartitionId(OutgoingMessageEnvelope envelope) { - return envelope.getPartitionKey(); + return envelope.getPartitionKey() == null ? envelope.getKey() : envelope.getPartitionKey(); } private String convertPartitionKeyToString(Object partitionKey) { @@ -267,12 +276,12 @@ public class EventHubSystemProducer implements SystemProducer { } else if (partitionKey instanceof byte[]) { return new String((byte[]) partitionKey, Charset.defaultCharset()); } else { - throw new SamzaException("Unsupported key type: " + partitionKey.getClass().toString()); + throw new SamzaException("Unsupported key type: " + partitionKey.getClass().toString()); } } - protected EventData createEventData(String streamName, OutgoingMessageEnvelope envelope) { - Optional<Interceptor> interceptor = Optional.ofNullable(interceptors.getOrDefault(streamName, null)); + protected EventData createEventData(String streamId, OutgoingMessageEnvelope envelope) { + Optional<Interceptor> interceptor = Optional.ofNullable(interceptors.getOrDefault(streamId, null)); byte[] eventValue = (byte[]) envelope.getMessage(); if (interceptor.isPresent()) { eventValue = interceptor.get().intercept(eventValue); @@ -288,7 +297,7 @@ public class EventHubSystemProducer implements SystemProducer { keyValue = (envelope.getKey() instanceof byte[]) ? new String((byte[]) envelope.getKey()) : envelope.getKey().toString(); } - eventData.getProperties().put("key", keyValue); + eventData.getProperties().put(KEY, keyValue); } return eventData; } http://git-wip-us.apache.org/repos/asf/samza/blob/1d420e75/samza-azure/src/test/java/org/apache/samza/checkpoint/azure/ITestAzureCheckpointManager.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/test/java/org/apache/samza/checkpoint/azure/ITestAzureCheckpointManager.java b/samza-azure/src/test/java/org/apache/samza/checkpoint/azure/ITestAzureCheckpointManager.java index 3e5ead0..4560b11 100644 --- a/samza-azure/src/test/java/org/apache/samza/checkpoint/azure/ITestAzureCheckpointManager.java +++ b/samza-azure/src/test/java/org/apache/samza/checkpoint/azure/ITestAzureCheckpointManager.java @@ -34,7 +34,7 @@ import org.junit.*; import java.util.HashMap; import java.util.Map; -@Ignore("Intergration Test") +@Ignore("Requires Azure account credentials") public class ITestAzureCheckpointManager { private static String storageConnectionString = ""; http://git-wip-us.apache.org/repos/asf/samza/blob/1d420e75/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventHubClientManagerFactory.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventHubClientManagerFactory.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventHubClientManagerFactory.java index 1f06f7d..368087a 100644 --- a/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventHubClientManagerFactory.java +++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventHubClientManagerFactory.java @@ -101,6 +101,10 @@ public class MockEventHubClientManagerFactory extends EventHubClientManagerFacto } return null; }); + EventHubPartitionRuntimeInformation mockPartitionRuntimeInfo = PowerMockito.mock(EventHubPartitionRuntimeInformation.class); + PowerMockito.when(mockPartitionRuntimeInfo.getLastEnqueuedOffset()) + .thenReturn(EventHubSystemConsumer.START_OF_STREAM); + CompletableFuture<EventHubPartitionRuntimeInformation> partitionFuture = new MockPartitionFuture(mockPartitionRuntimeInfo); // Producer mocks PartitionSender mockPartitionSender0 = PowerMockito.mock(PartitionSender.class); @@ -137,6 +141,7 @@ public class MockEventHubClientManagerFactory extends EventHubClientManagerFacto startingOffsets.put(partitionId, offset); return mockPartitionReceiver; }); + PowerMockito.when(mockEventHubClient.getPartitionRuntimeInformation(anyString())).thenReturn(partitionFuture); // Producer calls PowerMockito.when(mockEventHubClient.createPartitionSenderSync("0")).thenReturn(mockPartitionSender0); @@ -191,6 +196,19 @@ public class MockEventHubClientManagerFactory extends EventHubClientManagerFacto } } + private class MockPartitionFuture extends CompletableFuture<EventHubPartitionRuntimeInformation> { + EventHubPartitionRuntimeInformation runtimeInformation; + + MockPartitionFuture(EventHubPartitionRuntimeInformation runtimeInformation) { + this.runtimeInformation = runtimeInformation; + } + + @Override + public EventHubPartitionRuntimeInformation get(long timeout, TimeUnit unit) { + return runtimeInformation; + } + } + } } http://git-wip-us.apache.org/repos/asf/samza/blob/1d420e75/samza-azure/src/test/java/org/apache/samza/system/eventhub/admin/TestEventHubSystemAdmin.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/admin/TestEventHubSystemAdmin.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/admin/TestEventHubSystemAdmin.java index c49e68e..8861152 100644 --- a/samza-azure/src/test/java/org/apache/samza/system/eventhub/admin/TestEventHubSystemAdmin.java +++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/admin/TestEventHubSystemAdmin.java @@ -49,14 +49,8 @@ public class TestEventHubSystemAdmin { Assert.assertEquals(0, eventHubSystemAdmin.offsetComparator("150", "150").intValue()); Assert.assertEquals(1, eventHubSystemAdmin.offsetComparator("200", "100").intValue()); Assert.assertNull(eventHubSystemAdmin.offsetComparator("1", "a")); - Assert.assertEquals(-1, eventHubSystemAdmin - .offsetComparator("100", EventHubSystemConsumer.END_OF_STREAM).intValue()); - Assert.assertEquals(0, eventHubSystemAdmin.offsetComparator(EventHubSystemConsumer.END_OF_STREAM, - EventHubSystemConsumer.END_OF_STREAM).intValue()); - Assert.assertEquals(1, eventHubSystemAdmin - .offsetComparator(EventHubSystemConsumer.END_OF_STREAM, "100").intValue()); - Assert.assertEquals(-1, eventHubSystemAdmin - .offsetComparator(EventHubSystemConsumer.START_OF_STREAM, "10").intValue()); + Assert.assertNull(eventHubSystemAdmin.offsetComparator("100", EventHubSystemConsumer.END_OF_STREAM)); + Assert.assertNull(eventHubSystemAdmin.offsetComparator(EventHubSystemConsumer.END_OF_STREAM, EventHubSystemConsumer.END_OF_STREAM)); } @Test @@ -66,16 +60,13 @@ public class TestEventHubSystemAdmin { MockEventHubConfigFactory.getEventHubConfig(EventHubSystemProducer.PartitioningMethod.EVENT_HUB_HASHING)); Map<SystemStreamPartition, String> offsets = new HashMap<>(); SystemStreamPartition ssp0 = new SystemStreamPartition(SYSTEM_NAME, STREAM_NAME1, new Partition(0)); - SystemStreamPartition ssp1 = new SystemStreamPartition(SYSTEM_NAME, STREAM_NAME1, new Partition(1)); SystemStreamPartition ssp2 = new SystemStreamPartition(SYSTEM_NAME, STREAM_NAME1, new Partition(2)); offsets.put(ssp0, Integer.toString(0)); - offsets.put(ssp1, EventHubSystemConsumer.END_OF_STREAM); offsets.put(ssp2, EventHubSystemConsumer.START_OF_STREAM); Map<SystemStreamPartition, String> updatedOffsets = eventHubSystemAdmin.getOffsetsAfter(offsets); Assert.assertEquals(offsets.size(), updatedOffsets.size()); Assert.assertEquals("1", updatedOffsets.get(ssp0)); - Assert.assertEquals("-2", updatedOffsets.get(ssp1)); Assert.assertEquals("0", updatedOffsets.get(ssp2)); } @@ -102,8 +93,6 @@ public class TestEventHubSystemAdmin { partitionMetadataMap.forEach((partition, metadata) -> { Assert.assertEquals(EventHubSystemConsumer.START_OF_STREAM, metadata.getOldestOffset()); Assert.assertNotSame(EventHubSystemConsumer.END_OF_STREAM, metadata.getNewestOffset()); - Assert.assertTrue(Long.parseLong(EventHubSystemConsumer.END_OF_STREAM) - <= Long.parseLong(metadata.getNewestOffset())); String expectedUpcomingOffset = String.valueOf(Long.parseLong(metadata.getNewestOffset()) + 1); Assert.assertEquals(expectedUpcomingOffset, metadata.getUpcomingOffset()); }); http://git-wip-us.apache.org/repos/asf/samza/blob/1d420e75/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/ITestEventHubSystemConsumer.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/ITestEventHubSystemConsumer.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/ITestEventHubSystemConsumer.java index dbf8807..cfd8217 100644 --- a/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/ITestEventHubSystemConsumer.java +++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/ITestEventHubSystemConsumer.java @@ -29,14 +29,15 @@ import org.apache.samza.system.eventhub.MockEventHubConfigFactory; import org.apache.samza.system.eventhub.TestMetricsRegistry; import org.apache.samza.system.eventhub.producer.EventHubSystemProducer; import org.junit.Assert; +import org.junit.Ignore; import org.junit.Test; -import java.util.Collections; -import java.util.List; +import java.util.*; import static org.apache.samza.system.eventhub.MockEventHubConfigFactory.STREAM_NAME1; import static org.apache.samza.system.eventhub.MockEventHubConfigFactory.SYSTEM_NAME; +@Ignore("Requires Azure account credentials") public class ITestEventHubSystemConsumer { private Config createEventHubConfig() { http://git-wip-us.apache.org/repos/asf/samza/blob/1d420e75/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/TestEventHubSystemConsumer.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/TestEventHubSystemConsumer.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/TestEventHubSystemConsumer.java index a25a3b6..865a248 100644 --- a/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/TestEventHubSystemConsumer.java +++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/TestEventHubSystemConsumer.java @@ -22,6 +22,7 @@ package org.apache.samza.system.eventhub.consumer; import com.microsoft.azure.eventhubs.*; import org.apache.samza.Partition; +import org.apache.samza.config.MapConfig; import org.apache.samza.metrics.Counter; import org.apache.samza.system.IncomingMessageEnvelope; import org.apache.samza.system.SystemStreamPartition; @@ -41,7 +42,8 @@ import java.util.stream.Collectors; import static org.apache.samza.system.eventhub.MockEventHubConfigFactory.*; @RunWith(PowerMockRunner.class) -@PrepareForTest({EventHubRuntimeInformation.class, EventHubClient.class, PartitionReceiver.class, PartitionSender.class}) +@PrepareForTest({EventHubRuntimeInformation.class, EventHubPartitionRuntimeInformation.class, + EventHubClient.class, PartitionReceiver.class, PartitionSender.class}) public class TestEventHubSystemConsumer { private static final String MOCK_ENTITY_1 = "mocktopic1"; private static final String MOCK_ENTITY_2 = "mocktopic2"; @@ -85,11 +87,12 @@ public class TestEventHubSystemConsumer { configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, systemName, streamName), EVENTHUB_KEY_NAME); configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, systemName, streamName), EVENTHUB_KEY); configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, systemName, streamName), MOCK_ENTITY_1); + MapConfig config = new MapConfig(configMap); MockEventHubClientManagerFactory eventHubClientWrapperFactory = new MockEventHubClientManagerFactory(eventData); EventHubSystemConsumer consumer = - new EventHubSystemConsumer(new EventHubConfig(configMap), systemName, eventHubClientWrapperFactory, interceptors, + new EventHubSystemConsumer(new EventHubConfig(config), systemName, eventHubClientWrapperFactory, interceptors, testMetrics); consumer.register(ssp, "1"); consumer.register(ssp, EventHubSystemConsumer.END_OF_STREAM); @@ -124,11 +127,12 @@ public class TestEventHubSystemConsumer { configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, systemName, streamName), EVENTHUB_KEY_NAME); configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, systemName, streamName), EVENTHUB_KEY); configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, systemName, streamName), MOCK_ENTITY_1); + MapConfig config = new MapConfig(configMap); MockEventHubClientManagerFactory eventHubClientWrapperFactory = new MockEventHubClientManagerFactory(eventData); EventHubSystemConsumer consumer = - new EventHubSystemConsumer(new EventHubConfig(configMap), systemName, eventHubClientWrapperFactory, interceptors, + new EventHubSystemConsumer(new EventHubConfig(config), systemName, eventHubClientWrapperFactory, interceptors, testMetrics); consumer.register(ssp, EventHubSystemConsumer.END_OF_STREAM); consumer.start(); @@ -173,11 +177,12 @@ public class TestEventHubSystemConsumer { configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, systemName, streamName), EVENTHUB_KEY_NAME); configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, systemName, streamName), EVENTHUB_KEY); configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, systemName, streamName), MOCK_ENTITY_1); + MapConfig config = new MapConfig(configMap); MockEventHubClientManagerFactory eventHubClientWrapperFactory = new MockEventHubClientManagerFactory(eventData); EventHubSystemConsumer consumer = - new EventHubSystemConsumer(new EventHubConfig(configMap), systemName, eventHubClientWrapperFactory, interceptors, + new EventHubSystemConsumer(new EventHubConfig(config), systemName, eventHubClientWrapperFactory, interceptors, testMetrics); consumer.register(ssp, EventHubSystemConsumer.END_OF_STREAM); consumer.start(); @@ -224,11 +229,12 @@ public class TestEventHubSystemConsumer { configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, systemName, streamName), EVENTHUB_NAMESPACE); configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, systemName, streamName), EVENTHUB_KEY_NAME); configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, systemName, streamName), EVENTHUB_KEY); + MapConfig config = new MapConfig(configMap); MockEventHubClientManagerFactory eventHubClientWrapperFactory = new MockEventHubClientManagerFactory(eventData); EventHubSystemConsumer consumer = - new EventHubSystemConsumer(new EventHubConfig(configMap), systemName, eventHubClientWrapperFactory, interceptor, + new EventHubSystemConsumer(new EventHubConfig(config), systemName, eventHubClientWrapperFactory, interceptor, testMetrics); consumer.register(ssp1, EventHubSystemConsumer.START_OF_STREAM); consumer.register(ssp2, EventHubSystemConsumer.START_OF_STREAM); @@ -284,11 +290,12 @@ public class TestEventHubSystemConsumer { configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, systemName, streamName2), EVENTHUB_NAMESPACE); configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, systemName, streamName2), EVENTHUB_KEY_NAME); configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, systemName, streamName2), EVENTHUB_KEY); + MapConfig config = new MapConfig(configMap); MockEventHubClientManagerFactory eventHubClientWrapperFactory = new MockEventHubClientManagerFactory(eventData); EventHubSystemConsumer consumer = - new EventHubSystemConsumer(new EventHubConfig(configMap), systemName, eventHubClientWrapperFactory, interceptor, + new EventHubSystemConsumer(new EventHubConfig(config), systemName, eventHubClientWrapperFactory, interceptor, testMetrics); consumer.register(ssp1, EventHubSystemConsumer.START_OF_STREAM); http://git-wip-us.apache.org/repos/asf/samza/blob/1d420e75/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/ITestEventHubSystemProducer.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/ITestEventHubSystemProducer.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/ITestEventHubSystemProducer.java index cc40198..32b1604 100644 --- a/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/ITestEventHubSystemProducer.java +++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/ITestEventHubSystemProducer.java @@ -30,12 +30,14 @@ import org.apache.samza.system.eventhub.*; import org.apache.samza.system.eventhub.consumer.EventHubSystemConsumer; import org.apache.samza.util.NoOpMetricsRegistry; import org.junit.Assert; +import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static org.apache.samza.system.eventhub.MockEventHubConfigFactory.*; +@Ignore("Requires Azure account credentials") public class ITestEventHubSystemProducer { private static final Logger LOG = LoggerFactory.getLogger(ITestEventHubSystemProducer.class.getName()); http://git-wip-us.apache.org/repos/asf/samza/blob/1d420e75/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/TestEventHubSystemProducer.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/TestEventHubSystemProducer.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/TestEventHubSystemProducer.java index 10016ec..ef73775 100644 --- a/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/TestEventHubSystemProducer.java +++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/TestEventHubSystemProducer.java @@ -20,6 +20,7 @@ package org.apache.samza.system.eventhub.producer; import com.microsoft.azure.eventhubs.*; +import org.apache.samza.config.MapConfig; import org.apache.samza.system.OutgoingMessageEnvelope; import org.apache.samza.system.SystemStream; import org.apache.samza.system.eventhub.EventHubConfig; @@ -40,9 +41,12 @@ import java.util.stream.Collectors; import static org.apache.samza.system.eventhub.MockEventHubConfigFactory.*; @RunWith(PowerMockRunner.class) -@PrepareForTest({EventHubRuntimeInformation.class, EventHubClient.class, PartitionReceiver.class, PartitionSender.class}) +@PrepareForTest({EventHubRuntimeInformation.class, EventHubPartitionRuntimeInformation.class, + EventHubClient.class, PartitionReceiver.class, PartitionSender.class}) public class TestEventHubSystemProducer { + private static final String SOURCE = "TestEventHubSystemProducer"; + private static List<String> generateMessages(int numMsg) { Random rand = new Random(System.currentTimeMillis()); List<String> messages = new ArrayList<>(); @@ -76,20 +80,21 @@ public class TestEventHubSystemProducer { configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, systemName, streamName), EVENTHUB_ENTITY1); configMap.put(String.format(EventHubConfig.CONFIG_PRODUCER_PARTITION_METHOD, systemName), PartitioningMethod.PARTITION_KEY_AS_PARTITION.toString()); + MapConfig config = new MapConfig(configMap); MockEventHubClientManagerFactory factory = new MockEventHubClientManagerFactory(); EventHubSystemProducer producer = - new EventHubSystemProducer(new EventHubConfig(configMap), systemName, factory, interceptor, testMetrics); + new EventHubSystemProducer(new EventHubConfig(config), systemName, factory, interceptor, testMetrics); SystemStream systemStream = new SystemStream(systemName, streamName); - producer.register(streamName); + producer.register(SOURCE); producer.start(); outgoingMessagesP0.forEach(message -> - producer.send(streamName, new OutgoingMessageEnvelope(systemStream, partitionId0, null, message.getBytes()))); + producer.send(SOURCE, new OutgoingMessageEnvelope(systemStream, partitionId0, null, message.getBytes()))); outgoingMessagesP1.forEach(message -> - producer.send(streamName, new OutgoingMessageEnvelope(systemStream, partitionId1, null, message.getBytes()))); + producer.send(SOURCE, new OutgoingMessageEnvelope(systemStream, partitionId1, null, message.getBytes()))); // Retrieve sent data List<String> receivedData0 = factory.getSentData(systemName, streamName, partitionId0) @@ -126,20 +131,21 @@ public class TestEventHubSystemProducer { configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, systemName, streamName), EVENTHUB_ENTITY1); configMap.put(String.format(EventHubConfig.CONFIG_PRODUCER_PARTITION_METHOD, systemName), PartitioningMethod.PARTITION_KEY_AS_PARTITION.toString()); + MapConfig config = new MapConfig(configMap); MockEventHubClientManagerFactory factory = new MockEventHubClientManagerFactory(); EventHubSystemProducer producer = - new EventHubSystemProducer(new EventHubConfig(configMap), systemName, factory, interceptors, testMetrics); + new EventHubSystemProducer(new EventHubConfig(config), systemName, factory, interceptors, testMetrics); SystemStream systemStream = new SystemStream(systemName, streamName); - producer.register(streamName); + producer.register(SOURCE); producer.start(); outgoingMessagesP0.forEach(message -> - producer.send(streamName, new OutgoingMessageEnvelope(systemStream, partitionId0, null, message.getBytes()))); + producer.send(SOURCE, new OutgoingMessageEnvelope(systemStream, partitionId0, null, message.getBytes()))); outgoingMessagesP1.forEach(message -> - producer.send(streamName, new OutgoingMessageEnvelope(systemStream, partitionId1, null, message.getBytes()))); + producer.send(SOURCE, new OutgoingMessageEnvelope(systemStream, partitionId1, null, message.getBytes()))); // Retrieve sent data List<String> receivedData0 = factory.getSentData(systemName, streamName, partitionId0) @@ -180,23 +186,25 @@ public class TestEventHubSystemProducer { configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, systemName, streamName), EVENTHUB_KEY_NAME); configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, systemName, streamName), EVENTHUB_KEY); configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, systemName, streamName), EVENTHUB_ENTITY1); + // mod 2 on the partitionid to simulate consistent hashing configMap.put(String.format(EventHubConfig.CONFIG_PRODUCER_PARTITION_METHOD, systemName), PartitioningMethod.EVENT_HUB_HASHING.toString()); + MapConfig config = new MapConfig(configMap); MockEventHubClientManagerFactory factory = new MockEventHubClientManagerFactory(); EventHubSystemProducer producer = - new EventHubSystemProducer(new EventHubConfig(configMap), systemName, factory, interceptor, testMetrics); + new EventHubSystemProducer(new EventHubConfig(config), systemName, factory, interceptor, testMetrics); SystemStream systemStream = new SystemStream(systemName, streamName); - producer.register(streamName); + producer.register(SOURCE); producer.start(); outgoingMessagesP0.forEach(message -> - producer.send(streamName, new OutgoingMessageEnvelope(systemStream, partitionId0, null, message.getBytes()))); + producer.send(SOURCE, new OutgoingMessageEnvelope(systemStream, partitionId0, null, message.getBytes()))); outgoingMessagesP1.forEach(message -> - producer.send(streamName, new OutgoingMessageEnvelope(systemStream, partitionId1, null, message.getBytes()))); + producer.send(SOURCE, new OutgoingMessageEnvelope(systemStream, partitionId1, null, message.getBytes()))); // Retrieve sent data List<String> receivedData0 = factory.getSentData(systemName, streamName, 0)
